From 8ce0f35d752e0f135ef75ee8fcfbea1e2b9cbc92 Mon Sep 17 00:00:00 2001 From: Pavel Minaev Date: Mon, 8 Jan 2024 21:48:29 -0800 Subject: [PATCH 1/3] Implement #1430: [sys.monitoring] Conditional breakpoints Implement #1431: [sys.monitoring] Hit-conditional breakpoints Implement #1432: [sys.monitoring] Logpoints --- src/debugpy/common/util.py | 1 + src/debugpy/server/__init__.py | 5 + src/debugpy/server/adapters.py | 61 +-- src/debugpy/server/eval.py | 19 +- src/debugpy/server/tracing.py | 540 ------------------------- src/debugpy/server/tracing/__init__.py | 531 ++++++++++++++++++++++++ src/debugpy/server/tracing/tracer.py | 417 +++++++++++++++++++ 7 files changed, 998 insertions(+), 576 deletions(-) delete mode 100644 src/debugpy/server/tracing.py create mode 100644 src/debugpy/server/tracing/__init__.py create mode 100644 src/debugpy/server/tracing/tracer.py diff --git a/src/debugpy/common/util.py b/src/debugpy/common/util.py index 54850a07b..57dfea808 100644 --- a/src/debugpy/common/util.py +++ b/src/debugpy/common/util.py @@ -160,5 +160,6 @@ def hide_thread_from_debugger(thread): DEBUGPY_TRACE_DEBUGPY is used to debug debugpy with debugpy """ if hide_debugpy_internals(): + thread.is_debugpy_thread = True thread.pydev_do_not_trace = True thread.is_pydev_daemon_thread = True diff --git a/src/debugpy/server/__init__.py b/src/debugpy/server/__init__.py index c5abcc84b..e4233bc4b 100644 --- a/src/debugpy/server/__init__.py +++ b/src/debugpy/server/__init__.py @@ -4,6 +4,11 @@ def adapter(): + """ + Returns the instance of Adapter corresponding to the debug adapter that is currently + connected to this process, or None if there is no adapter connected. Use in lieu of + Adapter.instance to avoid import cycles. + """ from debugpy.server.adapters import Adapter return Adapter.instance diff --git a/src/debugpy/server/adapters.py b/src/debugpy/server/adapters.py index 0c9cf2dea..c913f0a4d 100644 --- a/src/debugpy/server/adapters.py +++ b/src/debugpy/server/adapters.py @@ -9,9 +9,9 @@ from debugpy.adapter import components from debugpy.common import json, log, messaging, sockets -from debugpy.common.messaging import Request -from debugpy.server import tracing, eval -from debugpy.server.tracing import Breakpoint, StackFrame +from debugpy.common.messaging import MessageDict, Request +from debugpy.server import eval +from debugpy.server.tracing import Breakpoint, StackFrame, Thread, Tracer class Adapter: @@ -50,13 +50,13 @@ class Expectations(components.Capabilities): server_access_token = None """Access token that the adapter must use to authenticate with this server.""" - _is_initialized: bool = False _has_started: bool = False _client_id: str = None _capabilities: Capabilities = None _expectations: Expectations = None _start_request: messaging.Request = None + _tracer: Tracer = None def __init__(self, stream: messaging.JsonIOStream): self._is_initialized = False @@ -65,6 +65,7 @@ def __init__(self, stream: messaging.JsonIOStream): self._capabilities = None self._expectations = None self._start_request = None + self._tracer = Tracer.instance self.channel = messaging.JsonMessageChannel(stream, self) self.channel.start() @@ -139,6 +140,8 @@ def initialize_request(self, request: Request): ] return { + "exceptionBreakpointFilters": exception_breakpoint_filters, + "supportsClipboardContext": True, "supportsCompletionsRequest": True, "supportsConditionalBreakpoints": True, "supportsConfigurationDoneRequest": True, @@ -148,17 +151,15 @@ def initialize_request(self, request: Request): "supportsExceptionInfoRequest": True, "supportsExceptionOptions": True, "supportsFunctionBreakpoints": True, + "supportsGotoTargetsRequest": True, "supportsHitConditionalBreakpoints": True, "supportsLogPoints": True, "supportsModulesRequest": True, "supportsSetExpression": True, "supportsSetVariable": True, - "supportsValueFormattingOptions": True, - "supportsTerminateRequest": True, - "supportsGotoTargetsRequest": True, - "supportsClipboardContext": True, - "exceptionBreakpointFilters": exception_breakpoint_filters, "supportsStepInTargetsRequest": True, + "supportsTerminateRequest": True, + "supportsValueFormattingOptions": True, } def _handle_start_request(self, request: Request): @@ -189,7 +190,7 @@ def configurationDone_request(self, request: Request): 'or an "attach" request' ) - tracing.start() + self._tracer.start() self._has_started = True request.respond({}) @@ -233,20 +234,28 @@ def setBreakpoints_request(self, request: Request): bps = list(request("breakpoints", json.array(json.object()))) else: lines = request("lines", json.array(int)) - bps = [{"line": line} for line in lines] + bps = [MessageDict(request, {"line": line}) for line in lines] Breakpoint.clear([path]) - bps_set = [Breakpoint.set(path, bp["line"]) for bp in bps] + bps_set = [ + Breakpoint.set( + path, bp["line"], + condition=bp("condition", str, optional=True), + hit_condition=bp("hitCondition", str, optional=True), + log_message=bp("logMessage", str, optional=True), + ) + for bp in bps + ] return {"breakpoints": bps_set} def threads_request(self, request: Request): - return {"threads": tracing.Thread.enumerate()} + return {"threads": Thread.enumerate()} def stackTrace_request(self, request: Request): thread_id = request("threadId", int) start_frame = request("startFrame", 0) - thread = tracing.Thread.get(thread_id) + thread = Thread.get(thread_id) if thread is None: raise request.isnt_valid(f'Invalid "threadId": {thread_id}') @@ -265,7 +274,7 @@ def pause_request(self, request: Request): thread_ids = None else: thread_ids = [request("threadId", int)] - tracing.pause(thread_ids) + self._tracer.pause(thread_ids) return {} def continue_request(self, request: Request): @@ -274,25 +283,25 @@ def continue_request(self, request: Request): else: thread_ids = [request("threadId", int)] single_thread = request("singleThread", False) - tracing.resume(thread_ids if single_thread else None) + self._tracer.resume(thread_ids if single_thread else None) return {} def stepIn_request(self, request: Request): # TODO: support "singleThread" and "granularity" thread_id = request("threadId", int) - tracing.step_in(thread_id) + self._tracer.step_in(thread_id) return {} def stepOut_request(self, request: Request): # TODO: support "singleThread" and "granularity" thread_id = request("threadId", int) - tracing.step_out(thread_id) + self._tracer.step_out(thread_id) return {} def next_request(self, request: Request): # TODO: support "singleThread" and "granularity" thread_id = request("threadId", int) - tracing.step_over(thread_id) + self._tracer.step_over(thread_id) return {} def scopes_request(self, request: Request): @@ -316,18 +325,18 @@ def evaluate_request(self, request: Request): return {"result": var.repr, "variablesReference": var.id} def disconnect_request(self, request: Request): - tracing.Breakpoint.clear() - tracing.abandon_step() - tracing.resume() + Breakpoint.clear() + self._tracer.abandon_step() + self._tracer.resume() return {} def terminate_request(self, request: Request): - tracing.Breakpoint.clear() - tracing.abandon_step() - tracing.resume() + Breakpoint.clear() + self._tracer.abandon_step() + self._tracer.resume() return {} def disconnect(self): - tracing.resume() + self._tracer.resume() self.connected_event.clear() return {} diff --git a/src/debugpy/server/eval.py b/src/debugpy/server/eval.py index c9fe4d1d6..cfa1e8db6 100644 --- a/src/debugpy/server/eval.py +++ b/src/debugpy/server/eval.py @@ -2,29 +2,28 @@ # Licensed under the MIT License. See LICENSE in the project root # for license information. +import debugpy import threading - from collections.abc import Iterable +from debugpy.server.inspect import inspect from types import FrameType from typing import ClassVar, Dict, Literal, Self -from debugpy.server import tracing -from debugpy.server.inspect import inspect - -ScopeKind = Literal["global", "nonlocal", "local"] +type ScopeKind = Literal["global", "nonlocal", "local"] +type StackFrame = "debugpy.server.tracing.StackFrame" _lock = threading.RLock() class VariableContainer: - frame: "tracing.StackFrame" + frame: StackFrame id: int _last_id: ClassVar[int] = 0 _all: ClassVar[Dict[int, "VariableContainer"]] = {} - def __init__(self, frame: "tracing.StackFrame"): + def __init__(self, frame: StackFrame): self.frame = frame with _lock: VariableContainer._last_id += 1 @@ -46,7 +45,7 @@ def variables(self) -> Iterable["Variable"]: raise NotImplementedError @classmethod - def invalidate(self, *frames: Iterable["tracing.StackFrame"]) -> None: + def invalidate(self, *frames: Iterable[StackFrame]) -> None: with _lock: ids = [ id @@ -61,7 +60,7 @@ class Scope(VariableContainer): frame: FrameType kind: ScopeKind - def __init__(self, frame: "tracing.StackFrame", kind: ScopeKind): + def __init__(self, frame: StackFrame, kind: ScopeKind): super().__init__(frame) self.kind = kind @@ -92,7 +91,7 @@ class Variable(VariableContainer): value: object # TODO: evaluateName, memoryReference, presentationHint - def __init__(self, frame: "tracing.StackFrame", name: str, value: object): + def __init__(self, frame: StackFrame, name: str, value: object): super().__init__(frame) self.name = name self.value = value diff --git a/src/debugpy/server/tracing.py b/src/debugpy/server/tracing.py deleted file mode 100644 index 007f5b781..000000000 --- a/src/debugpy/server/tracing.py +++ /dev/null @@ -1,540 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See LICENSE in the project root -# for license information. - -import inspect -import sys -import threading -import traceback - -from contextlib import contextmanager -from collections import defaultdict -from dataclasses import dataclass, field -from pathlib import Path -from sys import monitoring -from types import CodeType, FrameType -from typing import ClassVar, Dict, Iterable, List, Literal, Union - -from debugpy.server import adapter -from debugpy.server.eval import Scope, VariableContainer - -# Shared for all global state pertaining to breakpoints and stepping. -_cvar = threading.Condition() - -# IDs of threads that are currently pausing or paused. -_pause_ids = set() - -_steps = {} - - -@contextmanager -def cvar(who): - #print(end=f"ACQUIRING {who}\n") - with _cvar: - #print(end=f"ACQUIRED {who}\n") - yield - #print(end=f"RELEASING {who}\n") - #print(end=f"RELEASED {who}\n") - - -@dataclass -class Thread: - id: int = field(init=False) - thread: threading.Thread - - def __post_init__(self): - # TODO: map 32-bit DAP thread IDs to (potentially) 64-bit Python thread IDs. - # Otherwise, large thread IDs (common on Linux) will be truncated when they are serialized as JSON. - self.id = self.thread.ident - - def __getstate__(self): - return { - "id": self.id, - "name": self.thread.name, - } - - @property - def is_traced(self): - return not getattr(self.thread, "pydev_do_not_trace", False) - - @property - def name(self): - return self.thread.name - - @classmethod - def enumerate(self) -> List["Thread"]: - return [ - thread - for t in threading.enumerate() - for thread in [Thread(t)] - if thread.is_traced - ] - - @classmethod - def get(self, id: int) -> Union["Thread", None]: - for thread in self.enumerate(): - if thread.id == id: - return thread - return None - - def stack_trace(self) -> Iterable["StackFrame"]: - try: - (fobj,) = (fobj for (id, fobj) in sys._current_frames().items() if id == self.id) - except ValueError: - raise ValueError(f"Can't get frames for inactive Thread({self.id})") - for fobj, _ in traceback.walk_stack(fobj): - frame = StackFrame.from_frame_object(self, fobj) - if not frame.is_internal(): - yield frame - - -@dataclass -class StackFrame: - thread: Thread - frame_object: FrameType - - id: int = field(init=False) - _path: Path = field(init=False) - _scopes: List[Scope] = field(init=False, default=None) - - _last_id: ClassVar[int] = 0 - _all: ClassVar[Dict[int, "StackFrame"]] = {} - - def __post_init__(self): - StackFrame._last_id += 1 - self.id = StackFrame._last_id - self._path = None - self._all[self.id] = self - - def __getstate__(self): - return { - "id": self.id, - "name": self.frame_object.f_code.co_name, - "source": { - # TODO: use "sourceReference" when path isn't available (e.g. decompiled code) - "path": str(self.path()), - }, - "line": self.frame_object.f_lineno, - "column": 1, # TODO - # TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference" - } - - @property - def line(self) -> int: - return self.frame_object.f_lineno - - def path(self) -> Path: - if self._path is None: - path = Path(self.frame_object.f_code.co_filename) - try: - path = path.resolve() - except (OSError, RuntimeError): - pass - # No need to sync this. - self._path = path - return self._path - - def is_internal(self) -> bool: - # TODO: filter internal frames properly - parts = self.path().parts - internals = ["debugpy", "threading"] - return any(part.startswith(s) for s in internals for part in parts) - - @classmethod - def get(self, id: int) -> "StackFrame": - return self._all.get(id, None) - - @classmethod - def from_frame_object(self, thread: Thread, frame_object: FrameType) -> "StackFrame": - for frame in self._all.values(): - if frame.thread.id == thread.id and frame.frame_object is frame_object: - return frame - return StackFrame(thread, frame_object) - - def scopes(self) -> List[Scope]: - if self._scopes is None: - self._scopes = [ - Scope(self.frame_object, "local"), - Scope(self.frame_object, "global"), - ] - return self._scopes - - @classmethod - def invalidate(self, thread_id: int): - frames = [frame for frame in self._all.values() if frame.thread.id == thread_id] - VariableContainer.invalidate(*frames) - - -@dataclass -class Step: - step: Literal["in", "out", "over"] - origin: FrameType = None - origin_line: int = None - - -@dataclass -class Breakpoint: - path: Path - line: int - is_enabled: bool = True - - id: int = field(init=False) - - _last_id: ClassVar[int] = 0 - - _all: ClassVar[Dict[int, "Breakpoint"]] = {} - - _at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict( - lambda: defaultdict(lambda: []) - ) - - def __post_init__(self): - Breakpoint._last_id += 1 - self.id = Breakpoint._last_id - with cvar(1): - self._all[self.id] = self - self._at[self.path][self.line].append(self) - _cvar.notify_all() - - def __getstate__(self): - return { - "line": self.line, - "verified": True, # TODO - } - - def is_hit(self, frame: StackFrame): - with cvar(2): - # Check path last since path resolution is potentially expensive. - return ( - self.is_enabled - and frame.line == self.line - and frame.path() == self.path - ) - - @classmethod - def at(self, path: str, line: int) -> List["Breakpoint"]: - with cvar(3): - return self._at[path][line] - - @classmethod - def clear(self, paths: Iterable[str] = None): - #print("clear-bp", paths) - if paths is not None: - paths = [Path(path).resolve() for path in paths] - with cvar(4): - if paths is None: - paths = list(self._at.keys()) - for path in paths: - bps_in = self._at.pop(path, {}).values() - for bps_at in bps_in: - for bp in bps_at: - del self._all[bp.id] - _cvar.notify_all() - monitoring.restart_events() - - @classmethod - def set(self, path: str, line: int) -> "Breakpoint": - try: - path = Path(path).resolve() - except (OSError, RuntimeError): - pass - #print("set-bp", path, line) - bp = Breakpoint(path, line) - monitoring.restart_events() - return bp - - def enable(self, is_enabled: bool): - with cvar(5): - self.is_enabled = is_enabled - _cvar.notify_all() - - -def start(): - for thread in Thread.enumerate(): - adapter().channel.send_event( - "thread", - { - "reason": "started", - "threadId": thread.id, - "name": thread.name, - }, - ) - - monitoring.use_tool_id(monitoring.DEBUGGER_ID, "debugpy") - monitoring.set_events( - monitoring.DEBUGGER_ID, - ( - monitoring.events.LINE - | monitoring.events.PY_START - | monitoring.events.PY_RETURN - | monitoring.events.PY_RESUME - | monitoring.events.PY_YIELD - | monitoring.events.PY_THROW - | monitoring.events.PY_UNWIND - | monitoring.events.RAISE - | monitoring.events.RERAISE - | monitoring.events.EXCEPTION_HANDLED - ), - ) - - trace_funcs = { - monitoring.events.LINE: _trace_line, - monitoring.events.PY_START: _trace_py_start, - monitoring.events.PY_RESUME: _trace_py_resume, - monitoring.events.PY_RETURN: _trace_py_return, - monitoring.events.PY_YIELD: _trace_py_yield, - monitoring.events.PY_THROW: _trace_py_throw, - monitoring.events.PY_UNWIND: _trace_py_unwind, - monitoring.events.RAISE: _trace_raise, - monitoring.events.RERAISE: _trace_reraise, - monitoring.events.EXCEPTION_HANDLED: _trace_exception_handled, - } - for event, func in trace_funcs.items(): - monitoring.register_callback(monitoring.DEBUGGER_ID, event, func) - - -def pause(thread_ids: List[int] = None): - #print(f"PAUSE {thread_ids=}") - if thread_ids is None: - thread_ids = [thread.id for thread in Thread.enumerate()] - - # TODO: handle race between the above and new threads starting when doing pause-the-world. - with cvar(6): - _pause_ids.update(thread_ids) - _cvar.notify_all() - monitoring.restart_events() - - -def resume(thread_ids: List[int] = None): - #print(f"RESUME {thread_ids=}") - with cvar(7): - if thread_ids is None: - _pause_ids.clear() - else: - _pause_ids.difference_update(thread_ids) - _cvar.notify_all() - monitoring.restart_events() - - -def abandon_step(thread_ids: List[int] = None): - #print(f"ABANDON_STEP {thread_ids=}") - with cvar(8): - if thread_ids is None: - thread_ids = [thread.id for thread in Thread.enumerate()] - for thread_id in thread_ids: - _steps.pop(thread_id, None) - _cvar.notify_all() - monitoring.restart_events() - - -def step_in(thread_id: int): - with cvar(9): - _steps[thread_id] = Step("in") - _pause_ids.clear() - _cvar.notify_all() - monitoring.restart_events() - - -def step_out(thread_id: int): - with cvar(10): - _steps[thread_id] = Step("out") - _pause_ids.clear() - _cvar.notify_all() - monitoring.restart_events() - - -def step_over(thread_id: int): - with cvar(11): - _steps[thread_id] = Step("over") - _pause_ids.clear() - _cvar.notify_all() - monitoring.restart_events() - - -# On shutdown, modules go away (become None), but _trace_line is still invoked. -DISABLE = monitoring.DISABLE - - -def _stop(frame_obj: FrameType, reason: str, hit_breakpoints: Iterable[Breakpoint] = ()): - thread_id = threading.get_ident() - #print(f"STOP {thread_id=}, {reason=}, {hit_breakpoints=}") - with cvar(12): - if thread_id not in _pause_ids: - #print("STOP: not paused") - return - - #print("SENDING...") - adapter().channel.send_event( - "stopped", - { - "reason": reason, - "threadId": threading.get_ident(), - "allThreadsStopped": False, # TODO - "hitBreakpointIds": [bp.id for bp in hit_breakpoints], - }, - ) - #print("SENT!") - - #print(f"BLOCK {thread_id=}") - while thread_id in _pause_ids: - _cvar.wait() - #print(f"UNBLOCK {thread_id=}") - - step = _steps.get(thread_id, None) - if step is not None and step.origin is None: - step.origin = frame_obj - step.origin_line = frame_obj.f_lineno - - -def _trace_line(code: CodeType, line_number: int): - if monitoring is None: - return DISABLE - - thread = Thread(threading.current_thread()) - if not thread.is_traced: - return DISABLE - - stop_reason = None - with cvar(13): - if thread.id in _pause_ids: - stop_reason = "pause" - - step = _steps.get(thread.id, None) - is_stepping = step is not None and step.origin is not None - if is_stepping: - # TODO: use CALL/RETURN/PY_RETURN to track these more efficiently. - frame_obj = inspect.currentframe().f_back - step_finished = False - if step.step == "in": - if frame_obj is not step.origin or line_number != step.origin_line: - step_finished = True - elif step.step == "out": - step_finished = True - while frame_obj is not None: - if frame_obj is step.origin: - step_finished = False - break - frame_obj = frame_obj.f_back - elif step.step == "over": - step_finished = True - while frame_obj is not None: - if frame_obj is step.origin and frame_obj.f_lineno == step.origin_line: - step_finished = False - break - frame_obj = frame_obj.f_back - else: - raise ValueError(f"Unknown step type: {step.step}") - - if step_finished: - del _steps[thread.id] - _pause_ids.add(thread.id) - _cvar.notify_all() - stop_reason = "step" - - if stop_reason is not None: - return _stop(inspect.currentframe().f_back, stop_reason) - - path = Path(code.co_filename) - try: - path = path.resolve() - except (OSError, RuntimeError): - pass - # print(f"TRACE_LINE {thread_id=}, {path=}, {line_number=}") - - bps = Breakpoint.at(path, line_number) - if not bps and not is_stepping: - return DISABLE - - frame = StackFrame(thread, inspect.currentframe().f_back) - try: - bps_hit = [bp for bp in bps if bp.is_hit(frame)] - if bps_hit: - #print("!BREAKPOINT HIT!") - with cvar(14): - _pause_ids.add(thread.id) - _cvar.notify_all() - return _stop(frame.frame_object, "breakpoint", bps_hit) - finally: - del frame - - -def _trace_py_start(code: CodeType, ip: int): - if threading.current_thread() is not threading.main_thread(): - return - #print(f"TRACE_PY_START {code=}, {ip=}") - - -def _trace_py_resume(code: CodeType, ip: int): - if threading.current_thread() is not threading.main_thread(): - return - #print(f"TRACE_PY_RESUME {code=}, {ip=}") - - -def _trace_py_return(code: CodeType, ip: int, retval: object): - if threading.current_thread() is not threading.main_thread(): - return - try: - retval = repr(retval) - except: - retval = "" - #print(f"TRACE_PY_RETURN {code=}, {ip=}, {retval=}") - - -def _trace_py_yield(code: CodeType, ip: int, retval: object): - if threading.current_thread() is not threading.main_thread(): - return - try: - retval = repr(retval) - except: - retval = "" - #print(f"TRACE_PY_YIELD {code=}, {ip=}, {retval=}") - - -def _trace_py_throw(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_PY_THROW {code=}, {ip=}, {exc=}") - - -def _trace_py_unwind(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_PY_UNWIND {code=}, {ip=}, {exc=}") - - -def _trace_raise(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_RAISE {code=}, {ip=}, {exc=}") - - -def _trace_reraise(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_RERAISE {code=}, {ip=}, {exc=}") - - -def _trace_exception_handled(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_EXCEPTION_HANDLED {code=}, {ip=}, {exc=}") diff --git a/src/debugpy/server/tracing/__init__.py b/src/debugpy/server/tracing/__init__.py new file mode 100644 index 000000000..3195d8a2f --- /dev/null +++ b/src/debugpy/server/tracing/__init__.py @@ -0,0 +1,531 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +import re +import sys +import threading +import traceback +from collections import defaultdict +from dataclasses import dataclass +from debugpy import server +from debugpy.common import log +from debugpy.server.eval import Scope, VariableContainer +from pathlib import Path +from sys import monitoring +from types import CodeType, FrameType +from typing import Callable, ClassVar, Dict, Iterable, List, Literal, Union + +# Shared for all global state pertaining to breakpoints and stepping. +_cvar = threading.Condition() + + +class Thread: + """ + Represents a DAP Thread object. Instances must never be created directly; + use Thread.from_python_thread() instead. + """ + + id: int + """DAP ID of this thread. Distinct from thread.ident.""" + + python_thread: threading.Thread + """The Python thread object this DAP Thread represents.""" + + is_known_to_adapter: bool + """ + Whether this thread has been reported to the adapter via the + DAP "thread" event with "reason":"started". + """ + + _last_id = 0 + _all: ClassVar[Dict[int, "Thread"]] = {} + + def __init__(self, python_thread): + """ + Create a new Thread object for the given thread. Do not invoke directly; + use Thread.get() instead. + """ + self.python_thread = python_thread + self.is_known_to_adapter = False + + with _cvar: + # Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit + # floats by most DAP clients. However, OS thread IDs can be large 64-bit integers + # on some platforms. To avoid loss of precision, we map all thread IDs to 32-bit + # signed integers; if the original ID fits, we use it as is, otherwise we use a + # generated negative ID that is guaranteed to fit. + self.id = self.python_thread.ident + if self.id != float(self.id): + Thread._last_id -= 1 + self.id = Thread._last_id + self._all[self.id] = self + + log.info( + f"DAP Thread(id={self.id}) created for Python Thread(ident={self.python_thread.ident})" + ) + + def __getstate__(self): + return { + "id": self.id, + "name": self.name, + } + + @property + def is_debugpy_thread(self): + return getattr(self.python_thread, "is_debugpy_thread", False) + + @property + def is_traced(self): + return not self.is_debugpy_thread + + @property + def name(self): + return self.python_thread.name + + @classmethod + def from_python_thread(self, python_thread: threading.Thread = None) -> "Thread": + """ + Returns the DAP Thread object corresponding to the given Python thread, or for + the current Python thread if None, creating it and reporting it to adapter if + necessary. + """ + if python_thread is None: + python_thread = threading.current_thread() + with _cvar: + for thread in self._all.values(): + if thread.python_thread is python_thread: + break + else: + thread = Thread(python_thread) + thread.make_known_to_adapter() + return thread + + @classmethod + def get(self, id: int) -> Union["Thread", None]: + """ + Finds a thread by its DAP ID. Returns None if ID is unknown. + """ + with _cvar: + return self._all.get(id, None) + + @classmethod + def enumerate(self) -> List["Thread"]: + """ + Returns a list of all running threads in this process. + """ + return [ + thread + for python_thread in threading.enumerate() + for thread in [Thread.from_python_thread(python_thread)] + if thread.is_traced + ] + + def make_known_to_adapter(self): + """ + If adapter is connected to this process, reports this thread to it via DAP + "thread" event with "reason":"started" if it hasn't been reported already. + Returns True if thread is now known to the adapter, and False if there was + no adapter to report it to. + """ + with _cvar: + if not self.is_traced: + return False + if self.is_known_to_adapter: + return True + adapter = server.adapter() + if adapter is None: + return False + adapter.channel.send_event( + "thread", + { + "reason": "started", + "threadId": self.id, + "name": self.name, + }, + ) + self.is_known_to_adapter = True + return True + + def stack_trace(self) -> Iterable["StackFrame"]: + """ + Returns an iterable of StackFrame objects for the current stack of this thread, + starting with the topmost frame. + """ + try: + (fobj,) = ( + fobj for (id, fobj) in sys._current_frames().items() if id == self.id + ) + except ValueError: + raise ValueError(f"Can't get frames for inactive Thread({self.id})") + for fobj, _ in traceback.walk_stack(fobj): + frame = StackFrame.from_frame_object(self, fobj) + if not frame.is_internal(): + yield frame + + +class StackFrame: + """ + Represents a DAP StackFrame object. Instances must never be created directly; + use StackFrame.from_frame_object() instead. + """ + + thread: Thread + frame_object: FrameType + + id: int + _path: Path + _scopes: List[Scope] + + _last_id = 0 + _all: ClassVar[Dict[int, "StackFrame"]] = {} + + def __init__(self, thread: Thread, frame_object: FrameType): + """ + Create a new StackFrame object for the given thread and frame object. Do not + invoke directly; use StackFrame.from_frame_object() instead. + """ + StackFrame._last_id += 1 + self.id = StackFrame._last_id + self.thread = thread + self.frame_object = frame_object + self._path = None + self._scopes = None + self._all[self.id] = self + + def __getstate__(self): + return { + "id": self.id, + "name": self.frame_object.f_code.co_name, + "source": { + # TODO: use "sourceReference" when path isn't available (e.g. decompiled code) + "path": str(self.path()), + }, + "line": self.frame_object.f_lineno, + "column": 1, # TODO + # TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference" + } + + @property + def line(self) -> int: + return self.frame_object.f_lineno + + def path(self) -> Path: + if self._path is None: + path = Path(self.frame_object.f_code.co_filename) + try: + path = path.resolve() + except (OSError, RuntimeError): + pass + # No need to sync this since all instances are equivalent. + self._path = path + return self._path + + def is_internal(self) -> bool: + # TODO: filter internal frames properly + parts = self.path().parts + internals = ["debugpy", "threading"] + return any(part.startswith(s) for s in internals for part in parts) + + @classmethod + def from_frame_object( + self, thread: Thread, frame_object: FrameType + ) -> "StackFrame": + for frame in self._all.values(): + if frame.thread is thread and frame.frame_object is frame_object: + return frame + return StackFrame(thread, frame_object) + + @classmethod + def get(self, id: int) -> "StackFrame": + return self._all.get(id, None) + + def scopes(self) -> List[Scope]: + if self._scopes is None: + self._scopes = [ + Scope(self.frame_object, "local"), + Scope(self.frame_object, "global"), + ] + return self._scopes + + @classmethod + def invalidate(self, thread_id: int): + frames = [frame for frame in self._all.values() if frame.thread.id == thread_id] + VariableContainer.invalidate(*frames) + + +@dataclass +class Step: + step: Literal["in", "out", "over"] + origin: FrameType = None + origin_line: int = None + + +class Condition: + """ + Expression that must be true for the breakpoint to be triggered. + """ + + expression: str + """Python expression that must evaluate to True for the breakpoint to be triggered.""" + + _code: CodeType + + def __init__(self, breakpoint: "Breakpoint", expression: str): + self.expression = expression + self._code = compile( + expression, f"breakpoint-{breakpoint.id}-condition", "eval" + ) + + def test(self, frame: StackFrame) -> bool: + """ + Returns True if the breakpoint should be triggered in the specified frame. + """ + try: + return bool( + eval( + self._code, + frame.frame_object.f_globals, + frame.frame_object.f_locals, + ) + ) + except: + log.exception( + f"Exception while evaluating breakpoint condition: {self.expression}" + ) + return False + + +class HitCondition: + """ + Hit count expression that must be True for the breakpoint to be triggered. + + Must have the format `[]`, where is a positive integer literal, + and is one of `==` `>` `>=` `<` `<=` `%`, defaulting to `==` if unspecified. + + Examples: + 5: break on the 5th hit + ==5: ditto + >5: break on every hit after the 5th + >=5: break on the 5th hit and thereafter + %5: break on every 5th hit + """ + + _OPERATORS = { + "==": lambda expected_count, count: count == expected_count, + ">": lambda expected_count, count: count > expected_count, + ">=": lambda expected_count, count: count >= expected_count, + "<": lambda expected_count, count: count < expected_count, + "<=": lambda expected_count, count: count <= expected_count, + "%": lambda expected_count, count: count % expected_count == 0, + } + + hit_condition: str + _count: int + _operator: Callable[[int, int], bool] + + def __init__(self, hit_condition: str): + self.hit_condition = hit_condition + m = re.match(r"([<>=]+)?(\d+)", hit_condition) + if not m: + raise ValueError(f"Invalid hit condition: {hit_condition}") + self._count = int(m.group(2)) + try: + op = self._OPERATORS[m.group(1) or "=="] + except KeyError: + raise ValueError(f"Invalid hit condition operator: {op}") + self.test = lambda count: op(self._count, count) + + def test(self, count: int) -> bool: + """ + Returns True if the breakpoint should be triggered on the given hit count. + """ + # __init__ replaces this method with an actual implementation from _OPERATORS + # when it parses the condition. + raise NotImplementedError + + +class LogMessage: + """ + A message with spliced expressions, to be logged when a breakpoint is triggered. + """ + + message: str + """The message to be logged. May contain expressions in curly braces.""" + + _code: CodeType + """Compiled code object for the f-string corresponding to the message.""" + + def __init__(self, breakpoint: "Breakpoint", message: str): + self.message = message + f_string = "f" + repr(message) + self._code = compile(f_string, f"breakpoint-{breakpoint.id}-logMessage", "eval") + + def format(self, frame: StackFrame) -> str: + """ + Formats the message using the specified frame's locals and globals. + """ + try: + return eval( + self._code, frame.frame_object.f_globals, frame.frame_object.f_locals + ) + except: + log.exception( + f"Exception while formatting breakpoint log message: {self.message}" + ) + return self.message + + +class Breakpoint: + """ + Represents a DAP Breakpoint. + """ + + id: int + path: Path + line: int + is_enabled: bool + + condition: Condition | None + + hit_condition: HitCondition | None + + log_message: LogMessage | None + + hit_count: int + """Number of times this breakpoint has been hit.""" + + _last_id = 0 + + _all: ClassVar[Dict[int, "Breakpoint"]] = {} + + _at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict( + lambda: defaultdict(lambda: []) + ) + + def __init__( + self, path, line, *, condition=None, hit_condition=None, log_message=None + ): + with _cvar: + Breakpoint._last_id += 1 + self.id = Breakpoint._last_id + + self.path = path + self.line = line + self.is_enabled = True + self.condition = Condition(self, condition) if condition else None + self.hit_condition = HitCondition(hit_condition) if hit_condition else None + self.log_message = LogMessage(self, log_message) if log_message else None + self.hit_count = 0 + + with _cvar: + self._all[self.id] = self + self._at[self.path][self.line].append(self) + _cvar.notify_all() + + def __getstate__(self): + return { + "line": self.line, + "verified": True, # TODO + } + + @classmethod + def at(self, path: str, line: int) -> List["Breakpoint"]: + """ + Returns a list of all breakpoints at the specified location. + """ + with _cvar: + return self._at[path][line] + + @classmethod + def clear(self, paths: Iterable[str] = None): + """ + Removes all breakpoints in the specified files, or all files if None. + """ + if paths is not None: + paths = [Path(path).resolve() for path in paths] + with _cvar: + if paths is None: + paths = list(self._at.keys()) + for path in paths: + bps_in = self._at.pop(path, {}).values() + for bps_at in bps_in: + for bp in bps_at: + del self._all[bp.id] + _cvar.notify_all() + monitoring.restart_events() + + @classmethod + def set( + self, + path: str, + line: int, + *, + condition=None, + hit_condition=None, + log_message=None, + ) -> "Breakpoint": + """ + Creates a new breakpoint at the specified location. + """ + try: + path = Path(path).resolve() + except (OSError, RuntimeError): + pass + bp = Breakpoint( + path, + line, + condition=condition, + hit_condition=hit_condition, + log_message=log_message, + ) + monitoring.restart_events() + return bp + + def enable(self, is_enabled: bool): + """ + Enables or disables this breakpoint. + """ + with _cvar: + self.is_enabled = is_enabled + _cvar.notify_all() + + def is_triggered(self, frame: StackFrame) -> bool | str: + """ + Determines whether this breakpoint is triggered by the current line in the + specified stack frame, and updates its hit count. + + If the breakpoint is triggered, returns a truthy value; if the breakpoint has + a log message, it is formatted and returned, otherwise True is returned. + """ + with _cvar: + # Check path last since path resolution is potentially expensive. + if ( + not self.is_enabled + or frame.line != self.line + or frame.path() != self.path + ): + return False + + # Hit count must be updated even if conditions are false and execution + # isn't stopped. + self.hit_count += 1 + + # Check hit_condition first since it is faster than checking condition. + if self.hit_condition is not None and not self.hit_condition.test( + self.hit_count + ): + return False + if self.condition is not None and not self.condition.test(frame): + return False + + # If this is a logpoint, return the formatted message instead of True. + if self.log_message is not None: + return self.log_message.format(frame) + + return True + + +# sys.monitoring callbacks are defined in a separate submodule to enable tighter +# control over their use of global state; see comment there for details. +from .tracer import Tracer # noqa diff --git a/src/debugpy/server/tracing/tracer.py b/src/debugpy/server/tracing/tracer.py new file mode 100644 index 000000000..71f2ca509 --- /dev/null +++ b/src/debugpy/server/tracing/tracer.py @@ -0,0 +1,417 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +# Once callbacks are registered they are invoked even during finalization when the +# Python is shutting down. Thus, trace_* methods, and any other methods that they +# invoke, must not use any globals from this or other modules (including globals +# that represent imported modules or defined classes!) until it checks that they +# are present, or preload them into class or instance attributes in advance. +# To facilitate this, Tracer is defined in a separate submodule which should not +# contain ANY top-level imports other than typing nor definitions other than the +# class itself. All other imports must be done in class scope and then referred to +# from methods via self. + +from types import CodeType, FrameType +from typing import Iterable + + +class Log: + """ + Safe logging for Tracer. Delegates to debugpy.common.log, but only when it is + safe to do so (i.e. not during finalization). + """ + + from debugpy.common import log + + def __init__(self): + import atexit + + def nop(*args, **kwargs): + pass + + @atexit.register + def disable(): + self.debug = self.info = self.warning = self.error = self.exception = nop + + def debug(self, *args, **kwargs): + # self.log.debug("{0}", *args, **kwargs) + # print(*args) + pass + + def info(self, *args, **kwargs): + self.log.info("{0}", *args, **kwargs) + + def warning(self, *args, **kwargs): + self.log.warning("{0}", *args, **kwargs) + + def error(self, *args, **kwargs): + self.log.error("{0}", *args, **kwargs) + + def exception(self, *args, **kwargs): + self.log.exception("{0}", *args, **kwargs) + + +class Tracer: + """ + Singleton that manages sys.monitoring callbacks for this process. + """ + + import inspect + import threading + from debugpy import server + from debugpy.server.tracing import Breakpoint, Step, Thread, StackFrame, _cvar + from pathlib import Path + from sys import monitoring + + instance: "Tracer" + + log: Log + + _pause_ids = set() + """IDs of threads that are currently pausing or paused.""" + + _steps = {} + """Ongoing steps, keyed by thread ID.""" + + def __init__(self): + self.log = Log() + + @property + def adapter(self): + return self.server.adapter() + + def start(self): + """ + Register sys.monitoring tracing callbacks. + """ + + self.log.info("Registering sys.monitoring tracing callbacks...") + + self.monitoring.use_tool_id(self.monitoring.DEBUGGER_ID, "debugpy") + self.monitoring.set_events( + self.monitoring.DEBUGGER_ID, + ( + self.monitoring.events.LINE + | self.monitoring.events.PY_START + | self.monitoring.events.PY_RETURN + | self.monitoring.events.PY_RESUME + | self.monitoring.events.PY_YIELD + | self.monitoring.events.PY_THROW + | self.monitoring.events.PY_UNWIND + | self.monitoring.events.RAISE + | self.monitoring.events.RERAISE + | self.monitoring.events.EXCEPTION_HANDLED + ), + ) + trace_funcs = { + self.monitoring.events.LINE: self._trace_line, + self.monitoring.events.PY_START: self._trace_py_start, + self.monitoring.events.PY_RESUME: self._trace_py_resume, + self.monitoring.events.PY_RETURN: self._trace_py_return, + self.monitoring.events.PY_YIELD: self._trace_py_yield, + self.monitoring.events.PY_THROW: self._trace_py_throw, + self.monitoring.events.PY_UNWIND: self._trace_py_unwind, + self.monitoring.events.RAISE: self._trace_raise, + self.monitoring.events.RERAISE: self._trace_reraise, + self.monitoring.events.EXCEPTION_HANDLED: self._trace_exception_handled, + } + for event, func in trace_funcs.items(): + self.monitoring.register_callback(self.monitoring.DEBUGGER_ID, event, func) + + self.log.info("sys.monitoring tracing callbacks registered.") + + def pause(self, thread_ids: Iterable[int] = None): + """ + Pause the specified threads, or all threads if thread_ids is None. + """ + if thread_ids is None: + # Pausing is async, so additional threads may be spawned even as we are + # trying to pause the ones we currently know about; iterate until all + # known threads are paused, and no new threads appear. + while True: + thread_ids = {thread.id for thread in self.Thread.enumerate()} + if self._pause_ids.keys() == thread_ids: + return + self.pause(thread_ids) + else: + self.log.info(f"Pausing threads: {thread_ids}") + with self._cvar: + self._pause_ids.update(thread_ids) + self._cvar.notify_all() + self.monitoring.restart_events() + + def resume(self, thread_ids: Iterable[int] = None): + """ + Resume the specified threads, or all threads if thread_ids is None. + """ + with self._cvar: + if thread_ids is None: + self.log.info("Resuming all threads.") + self._pause_ids.clear() + else: + self.log.info(f"Resuming threads: {thread_ids}") + self._pause_ids.difference_update(thread_ids) + self._cvar.notify_all() + self.monitoring.restart_events() + + def abandon_step(self, thread_ids: Iterable[int] = None): + """ + Abandon any ongoing steps that are in progress on the specified threads + (all threads if thread_ids is None). + """ + with self._cvar: + if thread_ids is None: + thread_ids = [thread.id for thread in self.Thread.enumerate()] + for thread_id in thread_ids: + step = self._steps.pop(thread_id, None) + if step is not None: + self.log.info(f"Abandoned step-{step.step} on {thread_id}.") + self._cvar.notify_all() + self.monitoring.restart_events() + + def step_in(self, thread_id: int): + """ + Step into the next statement executed by the specified thread. + """ + self.log.info(f"Step in on thread {thread_id}.") + with self._cvar: + self._steps[thread_id] = self.Step("in") + self._pause_ids.clear() + self._cvar.notify_all() + self.monitoring.restart_events() + + def step_out(self, thread_id: int): + """ + Step out of the current function executed by the specified thread. + """ + self.log.info(f"Step out on thread {thread_id}.") + with self._cvar: + self._steps[thread_id] = self.Step("out") + self._pause_ids.clear() + self._cvar.notify_all() + self.monitoring.restart_events() + + def step_over(self, thread_id: int): + self.log.info(f"Step over on thread {thread_id}.") + """ + Step over the next statement executed by the specified thread. + """ + with self._cvar: + self._steps[thread_id] = self.Step("over") + self._pause_ids.clear() + self._cvar.notify_all() + self.monitoring.restart_events() + + def _stop( + self, + frame_obj: FrameType, + reason: str, + hit_breakpoints: Iterable[Breakpoint] = (), + ): + thread = self.Thread.from_python_thread() + self.log.info(f"Pausing thread {thread.id}: {reason}.") + + with self._cvar: + if thread.id not in self._pause_ids: + return + + self.adapter.channel.send_event( + "stopped", + { + "reason": reason, + "threadId": thread.id, + "allThreadsStopped": False, # TODO + "hitBreakpointIds": [bp.id for bp in hit_breakpoints], + }, + ) + + self.log.info(f"Thread {thread.id} paused.") + while thread.id in self._pause_ids: + self._cvar.wait() + self.log.info(f"Thread {thread.id} unpaused.") + + step = self._steps.get(thread.id, None) + if step is not None and step.origin is None: + step.origin = frame_obj + step.origin_line = frame_obj.f_lineno + + def _trace_line(self, code: CodeType, line_number: int): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + + self.log.debug(f"sys.monitoring event: LINE({line_number}, {code})") + frame_obj = self.inspect.currentframe().f_back + + stop_reason = None + with self._cvar: + if thread.id in self._pause_ids: + stop_reason = "pause" + + step = self._steps.get(thread.id, None) + is_stepping = step is not None and step.origin is not None + if not is_stepping: + self.log.debug(f"No step in progress on thread {thread.id}.") + else: + self.log.debug( + f"Tracing step-{step.step} originating from {step.origin} on thread {thread.id}." + ) + + # TODO: use CALL/RETURN/PY_RETURN to track these more efficiently. + step_finished = False + if step.step == "in": + if frame_obj is not step.origin or line_number != step.origin_line: + step_finished = True + elif step.step == "out": + step_finished = True + while frame_obj is not None: + if frame_obj is step.origin: + step_finished = False + break + frame_obj = frame_obj.f_back + elif step.step == "over": + step_finished = True + while frame_obj is not None: + if ( + frame_obj is step.origin + and frame_obj.f_lineno == step.origin_line + ): + step_finished = False + break + frame_obj = frame_obj.f_back + else: + raise ValueError(f"Unknown step type: {step.step}") + + if step_finished: + self.log.info(f"Step-{step.step} finished on thread {thread.id}.") + del self._steps[thread.id] + self._pause_ids.add(thread.id) + self._cvar.notify_all() + stop_reason = "step" + + if stop_reason is not None: + # Even if this thread is pausing, any debugpy internal code on it should + # keep running until it returns to user code; otherwise, it may deadlock + # if it was holding e.g. a messaging lock. + print(frame_obj.f_globals.get("__name__")) + if not frame_obj.f_globals.get("__name__", "").startswith("debugpy"): + return self._stop(frame_obj, stop_reason) + + self.log.debug(f"Resolving path {code.co_filename!r}...") + path = self.Path(code.co_filename) + try: + path = path.resolve() + except (OSError, RuntimeError): + pass + self.log.debug(f"Path {code.co_filename!r} resolved to {path}.") + + bps = self.Breakpoint.at(path, line_number) + if not bps and not is_stepping: + self.log.debug(f"No breakpoints at {path}:{line_number}.") + return self.monitoring.DISABLE + self.log.debug(f"Considering breakpoints: {[bp.__getstate__() for bp in bps]}.") + + frame = self.StackFrame(thread, self.inspect.currentframe().f_back) + try: + stop_bps = [] + for bp in bps: + match bp.is_triggered(frame): + case str() as message: + # Triggered, has logMessage - print it but don't stop. + self.adapter.channel.send_event( + "output", + { + "category": "console", + "output": message, + "line": line_number, + "source": {"path": path}, + }, + ) + case triggered if triggered: + # Triggered, no logMessage - stop. + stop_bps.append(bp) + case _: + continue + + if stop_bps: + self.log.info( + f"Stack frame {frame} stopping at breakpoints {[bp.__getstate__() for bp in stop_bps]}." + ) + with self._cvar: + self._pause_ids.add(thread.id) + self._cvar.notify_all() + return self._stop(frame.frame_object, "breakpoint", stop_bps) + finally: + del frame + + def _trace_py_start(self, code: CodeType, ip: int): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_START({code}, {ip})") + + def _trace_py_resume(self, code: CodeType, ip: int): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_RESUME({code}, {ip})") + + def _trace_py_return(self, code: CodeType, ip: int, retval: object): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_RETURN({code}, {ip})") + # TODO: capture returned value to report it when client requests locals. + pass + + def _trace_py_yield(self, code: CodeType, ip: int, retval: object): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_YIELD({code}, {ip})") + # TODO: capture yielded value to report it when client requests locals. + pass + + def _trace_py_throw(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: PY_THROW({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_py_unwind(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: PY_UNWIND({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_raise(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: RAISE({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_reraise(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: RERAISE({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_exception_handled(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: EXCEPTION_HANDLED({code}, {ip}, {type(exc).__qualname__})" + ) + + +Tracer.instance = Tracer() From 256967225e58a8dda1f45662c461229bd343d059 Mon Sep 17 00:00:00 2001 From: Pavel Minaev Date: Thu, 1 Feb 2024 11:40:05 -0800 Subject: [PATCH 2/3] Fix #1510: [sys.monitoring] Deadlocks when breakpoints are hit --- src/debugpy/server/tracing/__init__.py | 22 ++++++++++++---------- src/debugpy/server/tracing/tracer.py | 20 ++++++++++---------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/debugpy/server/tracing/__init__.py b/src/debugpy/server/tracing/__init__.py index 3195d8a2f..4413d424c 100644 --- a/src/debugpy/server/tracing/__init__.py +++ b/src/debugpy/server/tracing/__init__.py @@ -38,6 +38,12 @@ class Thread: DAP "thread" event with "reason":"started". """ + is_traced: bool + """ + Whether this thread is traced. Threads are normally traced, but API clients + can exclude a specific thread from tracing. + """ + _last_id = 0 _all: ClassVar[Dict[int, "Thread"]] = {} @@ -48,6 +54,7 @@ def __init__(self, python_thread): """ self.python_thread = python_thread self.is_known_to_adapter = False + self.is_traced = True with _cvar: # Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit @@ -71,14 +78,6 @@ def __getstate__(self): "name": self.name, } - @property - def is_debugpy_thread(self): - return getattr(self.python_thread, "is_debugpy_thread", False) - - @property - def is_traced(self): - return not self.is_debugpy_thread - @property def name(self): return self.python_thread.name @@ -88,10 +87,13 @@ def from_python_thread(self, python_thread: threading.Thread = None) -> "Thread" """ Returns the DAP Thread object corresponding to the given Python thread, or for the current Python thread if None, creating it and reporting it to adapter if - necessary. + necessary. If the current thread is internal debugpy thread, returns None. """ + if python_thread is None: python_thread = threading.current_thread() + if getattr(python_thread, "is_debugpy_thread", False): + return None with _cvar: for thread in self._all.values(): if thread.python_thread is python_thread: @@ -118,7 +120,7 @@ def enumerate(self) -> List["Thread"]: thread for python_thread in threading.enumerate() for thread in [Thread.from_python_thread(python_thread)] - if thread.is_traced + if thread is not None and thread.is_traced ] def make_known_to_adapter(self): diff --git a/src/debugpy/server/tracing/tracer.py b/src/debugpy/server/tracing/tracer.py index 71f2ca509..fc3853071 100644 --- a/src/debugpy/server/tracing/tracer.py +++ b/src/debugpy/server/tracing/tracer.py @@ -238,7 +238,7 @@ def _stop( def _trace_line(self, code: CodeType, line_number: int): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.monitoring.DISABLE self.log.debug(f"sys.monitoring event: LINE({line_number}, {code})") @@ -347,19 +347,19 @@ def _trace_line(self, code: CodeType, line_number: int): def _trace_py_start(self, code: CodeType, ip: int): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.monitoring.DISABLE self.log.debug(f"sys.monitoring event: PY_START({code}, {ip})") def _trace_py_resume(self, code: CodeType, ip: int): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.monitoring.DISABLE self.log.debug(f"sys.monitoring event: PY_RESUME({code}, {ip})") def _trace_py_return(self, code: CodeType, ip: int, retval: object): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.monitoring.DISABLE self.log.debug(f"sys.monitoring event: PY_RETURN({code}, {ip})") # TODO: capture returned value to report it when client requests locals. @@ -367,7 +367,7 @@ def _trace_py_return(self, code: CodeType, ip: int, retval: object): def _trace_py_yield(self, code: CodeType, ip: int, retval: object): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.monitoring.DISABLE self.log.debug(f"sys.monitoring event: PY_YIELD({code}, {ip})") # TODO: capture yielded value to report it when client requests locals. @@ -375,7 +375,7 @@ def _trace_py_yield(self, code: CodeType, ip: int, retval: object): def _trace_py_throw(self, code: CodeType, ip: int, exc: BaseException): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.log.debug( f"sys.monitoring event: PY_THROW({code}, {ip}, {type(exc).__qualname__})" @@ -383,7 +383,7 @@ def _trace_py_throw(self, code: CodeType, ip: int, exc: BaseException): def _trace_py_unwind(self, code: CodeType, ip: int, exc: BaseException): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.log.debug( f"sys.monitoring event: PY_UNWIND({code}, {ip}, {type(exc).__qualname__})" @@ -391,7 +391,7 @@ def _trace_py_unwind(self, code: CodeType, ip: int, exc: BaseException): def _trace_raise(self, code: CodeType, ip: int, exc: BaseException): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.log.debug( f"sys.monitoring event: RAISE({code}, {ip}, {type(exc).__qualname__})" @@ -399,7 +399,7 @@ def _trace_raise(self, code: CodeType, ip: int, exc: BaseException): def _trace_reraise(self, code: CodeType, ip: int, exc: BaseException): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.log.debug( f"sys.monitoring event: RERAISE({code}, {ip}, {type(exc).__qualname__})" @@ -407,7 +407,7 @@ def _trace_reraise(self, code: CodeType, ip: int, exc: BaseException): def _trace_exception_handled(self, code: CodeType, ip: int, exc: BaseException): thread = self.Thread.from_python_thread() - if not thread.is_traced: + if thread is None or not thread.is_traced: return self.log.debug( f"sys.monitoring event: EXCEPTION_HANDLED({code}, {ip}, {type(exc).__qualname__})" From 79d86b9510f58078e6fc4c388d88d7f83f9d41b2 Mon Sep 17 00:00:00 2001 From: Pavel Minaev Date: Sun, 25 Feb 2024 21:22:50 -0800 Subject: [PATCH 3/3] Fix race conditions. --- src/debugpy/server/__init__.py | 12 + src/debugpy/server/adapters.py | 146 ++++++++--- src/debugpy/server/tracing/__init__.py | 320 ++++++++++++++--------- src/debugpy/server/tracing/tracer.py | 346 ++++++++++++------------- 4 files changed, 479 insertions(+), 345 deletions(-) diff --git a/src/debugpy/server/__init__.py b/src/debugpy/server/__init__.py index e4233bc4b..8691d5a0c 100644 --- a/src/debugpy/server/__init__.py +++ b/src/debugpy/server/__init__.py @@ -2,6 +2,18 @@ # Licensed under the MIT License. See LICENSE in the project root # for license information. +import itertools + +# Unique IDs for DAP objects such as threads, variables, breakpoints etc. These are +# negative to allow for pre-existing OS-assigned IDs (which are positive) to be used +# where available, e.g. for threads. +_dap_ids = itertools.count(-1, -1) + + +def new_dap_id(): + """Returns the next unique ID.""" + return next(_dap_ids) + def adapter(): """ diff --git a/src/debugpy/server/adapters.py b/src/debugpy/server/adapters.py index c913f0a4d..1a0706783 100644 --- a/src/debugpy/server/adapters.py +++ b/src/debugpy/server/adapters.py @@ -10,8 +10,17 @@ from debugpy.adapter import components from debugpy.common import json, log, messaging, sockets from debugpy.common.messaging import MessageDict, Request -from debugpy.server import eval -from debugpy.server.tracing import Breakpoint, StackFrame, Thread, Tracer +from debugpy.server import eval, new_dap_id +from debugpy.server.tracing import ( + Breakpoint, + Condition, + HitCondition, + LogMessage, + Source, + StackFrame, + Thread, + Tracer, +) class Adapter: @@ -125,18 +134,20 @@ def initialize_request(self, request: Request): "default": False, "description": "Break whenever any exception is raised.", }, - { - "filter": "uncaught", - "label": "Uncaught Exceptions", - "default": True, - "description": "Break when the process is exiting due to unhandled exception.", - }, - { - "filter": "userUnhandled", - "label": "User Uncaught Exceptions", - "default": False, - "description": "Break when exception escapes into library code.", - }, + # TODO: https://github.com/microsoft/debugpy/issues/1453 + # { + # "filter": "uncaught", + # "label": "Uncaught Exceptions", + # "default": True, + # "description": "Break when the process is exiting due to unhandled exception.", + # }, + # TODO: https://github.com/microsoft/debugpy/issues/1454 + # { + # "filter": "userUnhandled", + # "label": "User Uncaught Exceptions", + # "default": False, + # "description": "Break when exception escapes into library code.", + # }, ] return { @@ -219,8 +230,7 @@ def setExceptionBreakpoints_request(self, request: Request): def setBreakpoints_request(self, request: Request): # TODO: implement source.reference for setting breakpoints in sources for # which source code was decompiled or retrieved via inspect.getsource. - source = request("source", json.object()) - path = source("path", str) + source = Source(request("source", json.object())("path", str)) # TODO: implement column support. # Use dis.get_instruction() to iterate over instructions and corresponding @@ -236,15 +246,66 @@ def setBreakpoints_request(self, request: Request): lines = request("lines", json.array(int)) bps = [MessageDict(request, {"line": line}) for line in lines] - Breakpoint.clear([path]) + Breakpoint.clear([source]) + + # Do the first pass validating conditions and log messages for syntax errors; if + # any breakpoint fails validation, we want to respond with an error right away + # so that user gets immediate feedback, but this also means that we shouldn't + # actually set any breakpoints until we've validated all of them. + bps_info = [] + for bp in bps: + id = new_dap_id() + line = bp("line", int) + + # A missing condition or log message can be represented as the corresponding + # property missing, or as the property being present but set to empty string. + + condition = bp("condition", str, optional=True) + if condition: + try: + condition = Condition(id, condition) + except SyntaxError as exc: + raise request.isnt_valid( + f"Syntax error in condition ({condition}): {exc}" + ) + else: + condition = None + + hit_condition = bp("hitCondition", str, optional=True) + if hit_condition: + try: + hit_condition = HitCondition(id, hit_condition) + except SyntaxError as exc: + raise request.isnt_valid( + f"Syntax error in hit condition ({hit_condition}): {exc}" + ) + else: + hit_condition = None + + log_message = bp("logMessage", str, optional=True) + if log_message: + try: + log_message = LogMessage(id, log_message) + except SyntaxError as exc: + raise request.isnt_valid( + f"Syntax error in log message f{log_message!r}: {exc}" + ) + else: + log_message = None + + bps_info.append((id, source, line, condition, hit_condition, log_message)) + + # Now that we know all breakpoints are syntactically valid, we can set them. bps_set = [ - Breakpoint.set( - path, bp["line"], - condition=bp("condition", str, optional=True), - hit_condition=bp("hitCondition", str, optional=True), - log_message=bp("logMessage", str, optional=True), + Breakpoint( + id, + source, + line, + condition=condition, + hit_condition=hit_condition, + log_message=log_message, ) - for bp in bps + for id, source, line, condition, hit_condition, log_message in bps_info ] return {"breakpoints": bps_set} @@ -269,39 +330,44 @@ def stackTrace_request(self, request: Request): finally: del frames + # For "pause" and "continue" requests, DAP requires a thread ID to be specified, + # but does not require the adapter to only pause/unpause the specified thread. + # Visual Studio debug adapter host does not support the ability to pause/unpause + # only the specified thread, and requires the adapter to always pause/unpause all + # threads. For "continue" requests, there is a capability flag that the client can + # use to indicate support for per-thread continuation, but there's no such flag + # for per-thread pausing. Furethermore, the semantics of unpausing a specific + # thread after all threads have been paused is unclear in the event the unpaused + # thread then spawns additional threads. Therefore, we always ignore the "threadId" + # property and just pause/unpause everything. + def pause_request(self, request: Request): - if request.arguments.get("threadId", None) == "*": - thread_ids = None - else: - thread_ids = [request("threadId", int)] - self._tracer.pause(thread_ids) + try: + self._tracer.pause() + except ValueError: + raise request.cant_handle("No threads to pause") return {} def continue_request(self, request: Request): - if request.arguments.get("threadId", None) == "*": - thread_ids = None - else: - thread_ids = [request("threadId", int)] - single_thread = request("singleThread", False) - self._tracer.resume(thread_ids if single_thread else None) + self._tracer.resume() return {} def stepIn_request(self, request: Request): # TODO: support "singleThread" and "granularity" - thread_id = request("threadId", int) - self._tracer.step_in(thread_id) + thread = Thread.get(request("threadId", int)) + self._tracer.step_in(thread) return {} def stepOut_request(self, request: Request): # TODO: support "singleThread" and "granularity" - thread_id = request("threadId", int) - self._tracer.step_out(thread_id) + thread = Thread.get(request("threadId", int)) + self._tracer.step_out(thread) return {} def next_request(self, request: Request): # TODO: support "singleThread" and "granularity" - thread_id = request("threadId", int) - self._tracer.step_over(thread_id) + thread = Thread.get(request("threadId", int)) + self._tracer.step_over(thread) return {} def scopes_request(self, request: Request): diff --git a/src/debugpy/server/tracing/__init__.py b/src/debugpy/server/tracing/__init__.py index 4413d424c..eee0fdf68 100644 --- a/src/debugpy/server/tracing/__init__.py +++ b/src/debugpy/server/tracing/__init__.py @@ -3,13 +3,13 @@ # for license information. import re -import sys import threading import traceback from collections import defaultdict from dataclasses import dataclass from debugpy import server from debugpy.common import log +from debugpy.server import new_dap_id from debugpy.server.eval import Scope, VariableContainer from pathlib import Path from sys import monitoring @@ -20,6 +20,47 @@ _cvar = threading.Condition() +class Source: + """ + Represents a DAP Source object. + """ + + path: str + """ + Path to the source file; immutable. Note that this needs not be an actual valid + path on the filesystem; values such as or are also allowed. + """ + + # TODO: support "sourceReference" for cases where path isn't available (e.g. decompiled code) + + def __init__(self, path: str): + # If it is a valid file path, we want to resolve and normalize it, so that it + # can be unambiguously compared to code object paths later. + try: + path = str(Path(path).resolve()) + except (OSError, RuntimeError): + # Something like or + pass + self.path = path + + def __getstate__(self) -> dict: + return {"path": self.path} + + def __repr__(self) -> str: + return f"Source({self.path!r})" + + def __str__(self) -> str: + return self.path + + def __eq__(self, other) -> bool: + if not isinstance(other, Source): + return False + return self.path == other.path + + def __hash__(self) -> int: + return hash(self.path) + + class Thread: """ Represents a DAP Thread object. Instances must never be created directly; @@ -32,6 +73,12 @@ class Thread: python_thread: threading.Thread """The Python thread object this DAP Thread represents.""" + python_frame: FrameType | None + """ + The Python frame object corresponding to the topmost stack frame on this thread + if it is suspended, or None if it is running. + """ + is_known_to_adapter: bool """ Whether this thread has been reported to the adapter via the @@ -44,42 +91,46 @@ class Thread: can exclude a specific thread from tracing. """ - _last_id = 0 _all: ClassVar[Dict[int, "Thread"]] = {} - def __init__(self, python_thread): + def __init__(self, python_thread: threading.Thread): """ Create a new Thread object for the given thread. Do not invoke directly; use Thread.get() instead. """ + self.python_thread = python_thread + self.current_frame = None self.is_known_to_adapter = False self.is_traced = True - with _cvar: - # Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit - # floats by most DAP clients. However, OS thread IDs can be large 64-bit integers - # on some platforms. To avoid loss of precision, we map all thread IDs to 32-bit - # signed integers; if the original ID fits, we use it as is, otherwise we use a - # generated negative ID that is guaranteed to fit. - self.id = self.python_thread.ident - if self.id != float(self.id): - Thread._last_id -= 1 - self.id = Thread._last_id - self._all[self.id] = self + # Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit + # floats by most DAP clients. However, OS thread IDs can be large 64-bit integers + # on some platforms. To avoid loss of precision, we map all thread IDs to 32-bit + # signed integers; if the original ID fits, we use it as is, otherwise we use a + # generated negative ID that is guaranteed to fit. + self.id = self.python_thread.ident + assert self.id is not None + + if self.id < 0 or self.id != float(self.id): + self.id = new_dap_id() + self._all[self.id] = self log.info( - f"DAP Thread(id={self.id}) created for Python Thread(ident={self.python_thread.ident})" + f"DAP {self} created for Python Thread(ident={self.python_thread.ident})" ) - def __getstate__(self): + def __repr__(self) -> str: + return f"Thread({self.id})" + + def __getstate__(self) -> dict: return { "id": self.id, "name": self.name, } @property - def name(self): + def name(self) -> str: return self.python_thread.name @classmethod @@ -89,18 +140,19 @@ def from_python_thread(self, python_thread: threading.Thread = None) -> "Thread" the current Python thread if None, creating it and reporting it to adapter if necessary. If the current thread is internal debugpy thread, returns None. """ - if python_thread is None: python_thread = threading.current_thread() + if python_thread.ident is None: + return None if getattr(python_thread, "is_debugpy_thread", False): return None with _cvar: for thread in self._all.values(): - if thread.python_thread is python_thread: + if thread.python_thread.ident == python_thread.ident: break else: thread = Thread(python_thread) - thread.make_known_to_adapter() + thread.make_known_to_adapter() return thread @classmethod @@ -112,16 +164,11 @@ def get(self, id: int) -> Union["Thread", None]: return self._all.get(id, None) @classmethod - def enumerate(self) -> List["Thread"]: + def enumerate(self) -> list["Thread"]: """ - Returns a list of all running threads in this process. + Returns all running threads in this process. """ - return [ - thread - for python_thread in threading.enumerate() - for thread in [Thread.from_python_thread(python_thread)] - if thread is not None and thread.is_traced - ] + return [thread for thread in self._all.values() if thread.is_traced] def make_known_to_adapter(self): """ @@ -155,15 +202,16 @@ def stack_trace(self) -> Iterable["StackFrame"]: starting with the topmost frame. """ try: - (fobj,) = ( - fobj for (id, fobj) in sys._current_frames().items() if id == self.id - ) + with _cvar: + python_frame = self.python_frame except ValueError: raise ValueError(f"Can't get frames for inactive Thread({self.id})") - for fobj, _ in traceback.walk_stack(fobj): - frame = StackFrame.from_frame_object(self, fobj) + for python_frame, _ in traceback.walk_stack(python_frame): + frame = StackFrame.from_frame_object(self, python_frame) + log.info("{0}", f"{self}: {frame}") if not frame.is_internal(): yield frame + log.info("{0}", f"{self}: End stack trace.") class StackFrame: @@ -176,10 +224,9 @@ class StackFrame: frame_object: FrameType id: int - _path: Path + _source: Source | None _scopes: List[Scope] - _last_id = 0 _all: ClassVar[Dict[int, "StackFrame"]] = {} def __init__(self, thread: Thread, frame_object: FrameType): @@ -187,45 +234,44 @@ def __init__(self, thread: Thread, frame_object: FrameType): Create a new StackFrame object for the given thread and frame object. Do not invoke directly; use StackFrame.from_frame_object() instead. """ - StackFrame._last_id += 1 - self.id = StackFrame._last_id + self.id = new_dap_id() self.thread = thread self.frame_object = frame_object - self._path = None + self._source = None self._scopes = None self._all[self.id] = self - def __getstate__(self): + def __getstate__(self) -> dict: return { "id": self.id, "name": self.frame_object.f_code.co_name, - "source": { - # TODO: use "sourceReference" when path isn't available (e.g. decompiled code) - "path": str(self.path()), - }, + "source": self.source(), "line": self.frame_object.f_lineno, "column": 1, # TODO # TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference" } + def __repr__(self) -> str: + result = f"StackFrame({self.id}, {self.frame_object}" + if self.is_internal(): + result += ", internal=True" + result += ")" + return result + @property def line(self) -> int: return self.frame_object.f_lineno - def path(self) -> Path: - if self._path is None: - path = Path(self.frame_object.f_code.co_filename) - try: - path = path.resolve() - except (OSError, RuntimeError): - pass - # No need to sync this since all instances are equivalent. - self._path = path - return self._path + def source(self) -> Source: + if self._source is None: + # No need to sync this since all instances created from the same path + # are equivalent for all purposes. + self._source = Source(self.frame_object.f_code.co_filename) + return self._source def is_internal(self) -> bool: # TODO: filter internal frames properly - parts = self.path().parts + parts = Path(self.source().path).parts internals = ["debugpy", "threading"] return any(part.startswith(s) for s in internals for part in parts) @@ -262,38 +308,68 @@ class Step: origin: FrameType = None origin_line: int = None + def __repr__(self): + return f"Step({self.step})" + + def is_complete(self, python_frame: FrameType) -> bool: + is_complete = False + if self.step == "in": + is_complete = ( + python_frame is not self.origin + or python_frame.f_lineno != self.origin_line + ) + elif self.step == "over": + is_complete = True + for python_frame, _ in traceback.walk_stack(python_frame): + if ( + python_frame is self.origin + and python_frame.f_lineno == self.origin_line + ): + is_complete = False + break + return is_complete + elif self.step == "out": + while python_frame is not None: + if python_frame is self.origin: + is_complete = False + break + else: + raise ValueError(f"Unknown step type: {self.step}") + return is_complete + class Condition: """ Expression that must be true for the breakpoint to be triggered. """ + id: int + """Used to identify the condition in stack traces. Should match breakpoint ID.""" + expression: str """Python expression that must evaluate to True for the breakpoint to be triggered.""" _code: CodeType - def __init__(self, breakpoint: "Breakpoint", expression: str): + def __init__(self, id: int, expression: str): + self.id = id self.expression = expression - self._code = compile( - expression, f"breakpoint-{breakpoint.id}-condition", "eval" - ) + self._code = compile(expression, f"breakpoint-{id}-condition", "eval") def test(self, frame: StackFrame) -> bool: """ Returns True if the breakpoint should be triggered in the specified frame. """ try: - return bool( - eval( - self._code, - frame.frame_object.f_globals, - frame.frame_object.f_locals, - ) + result = eval( + self._code, + frame.frame_object.f_globals, + frame.frame_object.f_locals, ) - except: - log.exception( - f"Exception while evaluating breakpoint condition: {self.expression}" + return bool(result) + except BaseException as exc: + log.error( + f"Exception while evaluating breakpoint condition ({self.expression}): {exc}" ) return False @@ -322,20 +398,26 @@ class HitCondition: "%": lambda expected_count, count: count % expected_count == 0, } + id: int + """Used to identify the condition in stack traces. Should match breakpoint ID.""" + hit_condition: str + """Hit count expression.""" + _count: int _operator: Callable[[int, int], bool] - def __init__(self, hit_condition: str): + def __init__(self, id: int, hit_condition: str): + self.id = id self.hit_condition = hit_condition - m = re.match(r"([<>=]+)?(\d+)", hit_condition) + m = re.match(r"^\D*(\d+)$", hit_condition) if not m: - raise ValueError(f"Invalid hit condition: {hit_condition}") + raise SyntaxError(f"Invalid hit condition: {hit_condition}") self._count = int(m.group(2)) try: op = self._OPERATORS[m.group(1) or "=="] except KeyError: - raise ValueError(f"Invalid hit condition operator: {op}") + raise SyntaxError(f"Invalid hit condition operator: {op}") self.test = lambda count: op(self._count, count) def test(self, count: int) -> bool: @@ -352,16 +434,20 @@ class LogMessage: A message with spliced expressions, to be logged when a breakpoint is triggered. """ + id: int + """Used to identify the condition in stack traces. Should match breakpoint ID.""" + message: str """The message to be logged. May contain expressions in curly braces.""" _code: CodeType """Compiled code object for the f-string corresponding to the message.""" - def __init__(self, breakpoint: "Breakpoint", message: str): + def __init__(self, id: int, message: str): + self.id = id self.message = message f_string = "f" + repr(message) - self._code = compile(f_string, f"breakpoint-{breakpoint.id}-logMessage", "eval") + self._code = compile(f_string, f"breakpoint-{id}-logMessage", "eval") def format(self, frame: StackFrame) -> str: """ @@ -371,9 +457,9 @@ def format(self, frame: StackFrame) -> str: return eval( self._code, frame.frame_object.f_globals, frame.frame_object.f_locals ) - except: + except BaseException as exc: log.exception( - f"Exception while formatting breakpoint log message: {self.message}" + f"Exception while formatting breakpoint log message f{self.message!r}: {exc}" ) return self.message @@ -384,7 +470,7 @@ class Breakpoint: """ id: int - path: Path + source: Source line: int is_enabled: bool @@ -397,93 +483,69 @@ class Breakpoint: hit_count: int """Number of times this breakpoint has been hit.""" - _last_id = 0 - _all: ClassVar[Dict[int, "Breakpoint"]] = {} - _at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict( + _at: ClassVar[Dict[Source, Dict[int, List["Breakpoint"]]]] = defaultdict( lambda: defaultdict(lambda: []) ) + # ID must be explicitly specified so that conditions and log message can + # use the same ID - this makes for better call stacks and error messages. def __init__( - self, path, line, *, condition=None, hit_condition=None, log_message=None + self, + id: int, + source: Source, + line: int, + *, + condition: Condition | None = None, + hit_condition: HitCondition | None = None, + log_message: LogMessage | None = None, ): - with _cvar: - Breakpoint._last_id += 1 - self.id = Breakpoint._last_id - - self.path = path + self.id = id + self.source = source self.line = line self.is_enabled = True - self.condition = Condition(self, condition) if condition else None - self.hit_condition = HitCondition(hit_condition) if hit_condition else None - self.log_message = LogMessage(self, log_message) if log_message else None + self.condition = condition + self.hit_condition = hit_condition + self.log_message = log_message self.hit_count = 0 with _cvar: self._all[self.id] = self - self._at[self.path][self.line].append(self) + self._at[self.source][self.line].append(self) _cvar.notify_all() + monitoring.restart_events() - def __getstate__(self): + def __getstate__(self) -> dict: return { "line": self.line, "verified": True, # TODO } @classmethod - def at(self, path: str, line: int) -> List["Breakpoint"]: + def at(self, source: Source, line: int) -> List["Breakpoint"]: """ Returns a list of all breakpoints at the specified location. """ with _cvar: - return self._at[path][line] + return self._at[source][line] @classmethod - def clear(self, paths: Iterable[str] = None): + def clear(self, sources: Iterable[Source] = None): """ Removes all breakpoints in the specified files, or all files if None. """ - if paths is not None: - paths = [Path(path).resolve() for path in paths] with _cvar: - if paths is None: - paths = list(self._at.keys()) - for path in paths: - bps_in = self._at.pop(path, {}).values() + if sources is None: + sources = list(self._at.keys()) + for source in sources: + bps_in = self._at.pop(source, {}).values() for bps_at in bps_in: for bp in bps_at: del self._all[bp.id] _cvar.notify_all() monitoring.restart_events() - @classmethod - def set( - self, - path: str, - line: int, - *, - condition=None, - hit_condition=None, - log_message=None, - ) -> "Breakpoint": - """ - Creates a new breakpoint at the specified location. - """ - try: - path = Path(path).resolve() - except (OSError, RuntimeError): - pass - bp = Breakpoint( - path, - line, - condition=condition, - hit_condition=hit_condition, - log_message=log_message, - ) - monitoring.restart_events() - return bp - def enable(self, is_enabled: bool): """ Enables or disables this breakpoint. @@ -501,11 +563,11 @@ def is_triggered(self, frame: StackFrame) -> bool | str: a log message, it is formatted and returned, otherwise True is returned. """ with _cvar: - # Check path last since path resolution is potentially expensive. + # Check source last since path resolution is potentially expensive. if ( not self.is_enabled or frame.line != self.line - or frame.path() != self.path + or frame.source() != self.source ): return False @@ -520,7 +582,7 @@ def is_triggered(self, frame: StackFrame) -> bool | str: return False if self.condition is not None and not self.condition.test(frame): return False - + # If this is a logpoint, return the formatted message instead of True. if self.log_message is not None: return self.log_message.format(frame) diff --git a/src/debugpy/server/tracing/tracer.py b/src/debugpy/server/tracing/tracer.py index fc3853071..70778d8b4 100644 --- a/src/debugpy/server/tracing/tracer.py +++ b/src/debugpy/server/tracing/tracer.py @@ -35,6 +35,7 @@ def disable(): self.debug = self.info = self.warning = self.error = self.exception = nop def debug(self, *args, **kwargs): + # TODO: improve logging performance enough to enable this. # self.log.debug("{0}", *args, **kwargs) # print(*args) pass @@ -60,19 +61,29 @@ class Tracer: import inspect import threading from debugpy import server - from debugpy.server.tracing import Breakpoint, Step, Thread, StackFrame, _cvar - from pathlib import Path + from debugpy.server.tracing import ( + Breakpoint, + Source, + Step, + Thread, + StackFrame, + _cvar, + ) from sys import monitoring instance: "Tracer" log: Log - _pause_ids = set() - """IDs of threads that are currently pausing or paused.""" + _stopped_by: Thread | None = None + """ + If not None, indicates the thread on which the event that caused the debuggee + to enter suspended state has occurred. When any other thread observes a non-None + value of this attribute, it must immediately suspend and wait until it is cleared. + """ - _steps = {} - """Ongoing steps, keyed by thread ID.""" + _steps: dict[Thread, Step] = {} + """Ongoing steps, keyed by thread.""" def __init__(self): self.log = Log() @@ -121,118 +132,130 @@ def start(self): self.log.info("sys.monitoring tracing callbacks registered.") - def pause(self, thread_ids: Iterable[int] = None): + def pause(self): """ - Pause the specified threads, or all threads if thread_ids is None. + Pause all threads. """ - if thread_ids is None: - # Pausing is async, so additional threads may be spawned even as we are - # trying to pause the ones we currently know about; iterate until all - # known threads are paused, and no new threads appear. - while True: - thread_ids = {thread.id for thread in self.Thread.enumerate()} - if self._pause_ids.keys() == thread_ids: - return - self.pause(thread_ids) - else: - self.log.info(f"Pausing threads: {thread_ids}") - with self._cvar: - self._pause_ids.update(thread_ids) - self._cvar.notify_all() - self.monitoring.restart_events() - - def resume(self, thread_ids: Iterable[int] = None): + self.log.info("Pausing all threads.") + with self._cvar: + # Although "pause" is a user-induced scenariop that is not specifically + # associated with any thread, we still need to pick some thread that + # will nominally own it to report the event on. If there is a designated + # main thread in the process, use that, otherwise pick one at random. + python_thread = self.threading.main_thread() + if python_thread is None: + python_thread = next(iter(self.threading.enumerate()), None) + if python_thread is None: + raise ValueError("No threads to pause.") + thread = self.Thread.from_python_thread(python_thread) + self.begin_stop(thread, "pause") + + def resume(self): """ - Resume the specified threads, or all threads if thread_ids is None. + Resume all threads. """ - with self._cvar: - if thread_ids is None: - self.log.info("Resuming all threads.") - self._pause_ids.clear() - else: - self.log.info(f"Resuming threads: {thread_ids}") - self._pause_ids.difference_update(thread_ids) - self._cvar.notify_all() - self.monitoring.restart_events() + self.log.info("Resuming all threads.") + self.end_stop() - def abandon_step(self, thread_ids: Iterable[int] = None): + def abandon_step(self, threads: Iterable[int] = None): """ Abandon any ongoing steps that are in progress on the specified threads - (all threads if thread_ids is None). + (all threads if argument is None). """ with self._cvar: - if thread_ids is None: - thread_ids = [thread.id for thread in self.Thread.enumerate()] - for thread_id in thread_ids: - step = self._steps.pop(thread_id, None) - if step is not None: - self.log.info(f"Abandoned step-{step.step} on {thread_id}.") + if threads is None: + step = self._steps.clear() + while self._steps: + thread, step = self._steps.popitem() + self.log.info(f"Abandoned {step} on {thread}.") + else: + for thread in threads: + step = self._steps.pop(thread, None) + if step is not None: + self.log.info(f"Abandoned {step} on {thread}.") self._cvar.notify_all() self.monitoring.restart_events() - def step_in(self, thread_id: int): + def step_in(self, thread: Thread): """ Step into the next statement executed by the specified thread. """ - self.log.info(f"Step in on thread {thread_id}.") + self.log.info(f"Step in on {thread}.") with self._cvar: - self._steps[thread_id] = self.Step("in") - self._pause_ids.clear() - self._cvar.notify_all() + self._steps[thread] = self.Step("in") + self.end_stop() self.monitoring.restart_events() - def step_out(self, thread_id: int): + def step_out(self, thread: Thread): """ Step out of the current function executed by the specified thread. """ - self.log.info(f"Step out on thread {thread_id}.") + self.log.info(f"Step out on {thread}.") with self._cvar: - self._steps[thread_id] = self.Step("out") - self._pause_ids.clear() - self._cvar.notify_all() + self._steps[thread] = self.Step("out") + self.end_stop() self.monitoring.restart_events() - def step_over(self, thread_id: int): - self.log.info(f"Step over on thread {thread_id}.") + def step_over(self, thread: Thread): + self.log.info(f"Step over on {thread}.") """ Step over the next statement executed by the specified thread. """ with self._cvar: - self._steps[thread_id] = self.Step("over") - self._pause_ids.clear() - self._cvar.notify_all() + self._steps[thread] = self.Step("over") + self.end_stop() self.monitoring.restart_events() - def _stop( - self, - frame_obj: FrameType, - reason: str, - hit_breakpoints: Iterable[Breakpoint] = (), + def begin_stop( + self, thread: Thread, reason: str, hit_breakpoints: Iterable[Breakpoint] = () ): - thread = self.Thread.from_python_thread() - self.log.info(f"Pausing thread {thread.id}: {reason}.") + """ + Report the stop to the adapter and tell all threads to suspend themselves. + """ with self._cvar: - if thread.id not in self._pause_ids: + self._stopped_by = thread + self._cvar.notify_all() + self.monitoring.restart_events() + self.adapter.channel.send_event( + "stopped", + { + "reason": reason, + "threadId": thread.id, + "allThreadsStopped": True, + "hitBreakpointIds": [bp.id for bp in hit_breakpoints], + }, + ) + + def end_stop(self): + """ + Tell all threads to resume themselves. + """ + with self._cvar: + self._stopped_by = None + self._cvar.notify_all() + + def suspend_this_thread(self, frame_obj: FrameType): + """ + Suspends execution of this thread until the current stop ends. + """ + + thread = self.Thread.from_python_thread() + with self._cvar: + if self._stopped_by is None: return - self.adapter.channel.send_event( - "stopped", - { - "reason": reason, - "threadId": thread.id, - "allThreadsStopped": False, # TODO - "hitBreakpointIds": [bp.id for bp in hit_breakpoints], - }, - ) - - self.log.info(f"Thread {thread.id} paused.") - while thread.id in self._pause_ids: + self.log.info(f"{thread} suspended.") + thread.python_frame = frame_obj + while self._stopped_by is not None: self._cvar.wait() - self.log.info(f"Thread {thread.id} unpaused.") + thread.python_frame = None + self.log.info(f"{thread} resumed.") - step = self._steps.get(thread.id, None) + step = self._steps.get(thread, None) if step is not None and step.origin is None: + # This step has just begun - update the Step object with information + # about current frame that will be used to track step completion. step.origin = frame_obj step.origin_line = frame_obj.f_lineno @@ -242,108 +265,79 @@ def _trace_line(self, code: CodeType, line_number: int): return self.monitoring.DISABLE self.log.debug(f"sys.monitoring event: LINE({line_number}, {code})") - frame_obj = self.inspect.currentframe().f_back - stop_reason = None - with self._cvar: - if thread.id in self._pause_ids: - stop_reason = "pause" - - step = self._steps.get(thread.id, None) - is_stepping = step is not None and step.origin is not None - if not is_stepping: - self.log.debug(f"No step in progress on thread {thread.id}.") - else: + # These two local variables hold direct or indirect references to frame + # objects on the stack of the current thread, and thus must be cleaned up + # on exit to avoid expensive GC cycles. + python_frame = self.inspect.currentframe().f_back + frame = None + try: + with self._cvar: + step = self._steps.get(thread, None) + is_stepping = step is not None and step.origin is not None + if not is_stepping: + self.log.debug(f"No step in progress on {thread}.") + else: + self.log.debug( + f"Tracing {step} originating from {step.origin} on {thread}." + ) + if step.is_complete(python_frame): + self.log.info(f"{step} finished on thread {thread}.") + del self._steps[thread] + self.begin_stop(thread, "step") + + if self._stopped_by is not None: + # Even if this thread is pausing, any debugpy internal code on it should + # keep running until it returns to user code; otherwise, it may deadlock + # if it was holding e.g. a messaging lock. + if not python_frame.f_globals.get("__name__", "").startswith( + "debugpy" + ): + self.suspend_this_thread(python_frame) + return + + self.log.debug(f"Resolving path {code.co_filename!r}...") + source = self.Source(code.co_filename) + self.log.debug(f"Path {code.co_filename!r} resolved to {source}.") + + bps = self.Breakpoint.at(source, line_number) + if not bps and not is_stepping: + self.log.debug(f"No breakpoints at {source}:{line_number}.") + return self.monitoring.DISABLE self.log.debug( - f"Tracing step-{step.step} originating from {step.origin} on thread {thread.id}." + f"Considering breakpoints: {[bp.__getstate__() for bp in bps]}." ) - # TODO: use CALL/RETURN/PY_RETURN to track these more efficiently. - step_finished = False - if step.step == "in": - if frame_obj is not step.origin or line_number != step.origin_line: - step_finished = True - elif step.step == "out": - step_finished = True - while frame_obj is not None: - if frame_obj is step.origin: - step_finished = False - break - frame_obj = frame_obj.f_back - elif step.step == "over": - step_finished = True - while frame_obj is not None: - if ( - frame_obj is step.origin - and frame_obj.f_lineno == step.origin_line - ): - step_finished = False - break - frame_obj = frame_obj.f_back - else: - raise ValueError(f"Unknown step type: {step.step}") - - if step_finished: - self.log.info(f"Step-{step.step} finished on thread {thread.id}.") - del self._steps[thread.id] - self._pause_ids.add(thread.id) - self._cvar.notify_all() - stop_reason = "step" - - if stop_reason is not None: - # Even if this thread is pausing, any debugpy internal code on it should - # keep running until it returns to user code; otherwise, it may deadlock - # if it was holding e.g. a messaging lock. - print(frame_obj.f_globals.get("__name__")) - if not frame_obj.f_globals.get("__name__", "").startswith("debugpy"): - return self._stop(frame_obj, stop_reason) - - self.log.debug(f"Resolving path {code.co_filename!r}...") - path = self.Path(code.co_filename) - try: - path = path.resolve() - except (OSError, RuntimeError): - pass - self.log.debug(f"Path {code.co_filename!r} resolved to {path}.") - - bps = self.Breakpoint.at(path, line_number) - if not bps and not is_stepping: - self.log.debug(f"No breakpoints at {path}:{line_number}.") - return self.monitoring.DISABLE - self.log.debug(f"Considering breakpoints: {[bp.__getstate__() for bp in bps]}.") - - frame = self.StackFrame(thread, self.inspect.currentframe().f_back) - try: - stop_bps = [] - for bp in bps: - match bp.is_triggered(frame): - case str() as message: - # Triggered, has logMessage - print it but don't stop. - self.adapter.channel.send_event( - "output", - { - "category": "console", - "output": message, - "line": line_number, - "source": {"path": path}, - }, - ) - case triggered if triggered: - # Triggered, no logMessage - stop. - stop_bps.append(bp) - case _: - continue - - if stop_bps: - self.log.info( - f"Stack frame {frame} stopping at breakpoints {[bp.__getstate__() for bp in stop_bps]}." - ) - with self._cvar: - self._pause_ids.add(thread.id) - self._cvar.notify_all() - return self._stop(frame.frame_object, "breakpoint", stop_bps) + frame = self.StackFrame(thread, self.inspect.currentframe().f_back) + stop_bps = [] + for bp in bps: + match bp.is_triggered(frame): + case str() as message: + # Triggered, has logMessage - print it but don't stop. + self.adapter.channel.send_event( + "output", + { + "category": "console", + "output": message, + "line": line_number, + "source": source, + }, + ) + case triggered if triggered: + # Triggered, no logMessage - stop. + stop_bps.append(bp) + case _: + continue + + if stop_bps: + self.log.info( + f"Stack frame {frame} stopping at breakpoints {[bp.__getstate__() for bp in stop_bps]}." + ) + self.begin_stop(thread, "breakpoint", stop_bps) + self.suspend_this_thread(frame.frame_object) finally: del frame + del python_frame def _trace_py_start(self, code: CodeType, ip: int): thread = self.Thread.from_python_thread()