diff --git a/src/debugpy/common/util.py b/src/debugpy/common/util.py index 54850a07b..57dfea808 100644 --- a/src/debugpy/common/util.py +++ b/src/debugpy/common/util.py @@ -160,5 +160,6 @@ def hide_thread_from_debugger(thread): DEBUGPY_TRACE_DEBUGPY is used to debug debugpy with debugpy """ if hide_debugpy_internals(): + thread.is_debugpy_thread = True thread.pydev_do_not_trace = True thread.is_pydev_daemon_thread = True diff --git a/src/debugpy/server/__init__.py b/src/debugpy/server/__init__.py index c5abcc84b..e4233bc4b 100644 --- a/src/debugpy/server/__init__.py +++ b/src/debugpy/server/__init__.py @@ -4,6 +4,11 @@ def adapter(): + """ + Returns the instance of Adapter corresponding to the debug adapter that is currently + connected to this process, or None if there is no adapter connected. Use in lieu of + Adapter.instance to avoid import cycles. + """ from debugpy.server.adapters import Adapter return Adapter.instance diff --git a/src/debugpy/server/adapters.py b/src/debugpy/server/adapters.py index 0c9cf2dea..c913f0a4d 100644 --- a/src/debugpy/server/adapters.py +++ b/src/debugpy/server/adapters.py @@ -9,9 +9,9 @@ from debugpy.adapter import components from debugpy.common import json, log, messaging, sockets -from debugpy.common.messaging import Request -from debugpy.server import tracing, eval -from debugpy.server.tracing import Breakpoint, StackFrame +from debugpy.common.messaging import MessageDict, Request +from debugpy.server import eval +from debugpy.server.tracing import Breakpoint, StackFrame, Thread, Tracer class Adapter: @@ -50,13 +50,13 @@ class Expectations(components.Capabilities): server_access_token = None """Access token that the adapter must use to authenticate with this server.""" - _is_initialized: bool = False _has_started: bool = False _client_id: str = None _capabilities: Capabilities = None _expectations: Expectations = None _start_request: messaging.Request = None + _tracer: Tracer = None def __init__(self, stream: messaging.JsonIOStream): self._is_initialized = False @@ -65,6 +65,7 @@ def __init__(self, stream: messaging.JsonIOStream): self._capabilities = None self._expectations = None self._start_request = None + self._tracer = Tracer.instance self.channel = messaging.JsonMessageChannel(stream, self) self.channel.start() @@ -139,6 +140,8 @@ def initialize_request(self, request: Request): ] return { + "exceptionBreakpointFilters": exception_breakpoint_filters, + "supportsClipboardContext": True, "supportsCompletionsRequest": True, "supportsConditionalBreakpoints": True, "supportsConfigurationDoneRequest": True, @@ -148,17 +151,15 @@ def initialize_request(self, request: Request): "supportsExceptionInfoRequest": True, "supportsExceptionOptions": True, "supportsFunctionBreakpoints": True, + "supportsGotoTargetsRequest": True, "supportsHitConditionalBreakpoints": True, "supportsLogPoints": True, "supportsModulesRequest": True, "supportsSetExpression": True, "supportsSetVariable": True, - "supportsValueFormattingOptions": True, - "supportsTerminateRequest": True, - "supportsGotoTargetsRequest": True, - "supportsClipboardContext": True, - "exceptionBreakpointFilters": exception_breakpoint_filters, "supportsStepInTargetsRequest": True, + "supportsTerminateRequest": True, + "supportsValueFormattingOptions": True, } def _handle_start_request(self, request: Request): @@ -189,7 +190,7 @@ def configurationDone_request(self, request: Request): 'or an "attach" request' ) - tracing.start() + self._tracer.start() self._has_started = True request.respond({}) @@ -233,20 +234,28 @@ def setBreakpoints_request(self, request: Request): bps = list(request("breakpoints", json.array(json.object()))) else: lines = request("lines", json.array(int)) - bps = [{"line": line} for line in lines] + bps = [MessageDict(request, {"line": line}) for line in lines] Breakpoint.clear([path]) - bps_set = [Breakpoint.set(path, bp["line"]) for bp in bps] + bps_set = [ + Breakpoint.set( + path, bp["line"], + condition=bp("condition", str, optional=True), + hit_condition=bp("hitCondition", str, optional=True), + log_message=bp("logMessage", str, optional=True), + ) + for bp in bps + ] return {"breakpoints": bps_set} def threads_request(self, request: Request): - return {"threads": tracing.Thread.enumerate()} + return {"threads": Thread.enumerate()} def stackTrace_request(self, request: Request): thread_id = request("threadId", int) start_frame = request("startFrame", 0) - thread = tracing.Thread.get(thread_id) + thread = Thread.get(thread_id) if thread is None: raise request.isnt_valid(f'Invalid "threadId": {thread_id}') @@ -265,7 +274,7 @@ def pause_request(self, request: Request): thread_ids = None else: thread_ids = [request("threadId", int)] - tracing.pause(thread_ids) + self._tracer.pause(thread_ids) return {} def continue_request(self, request: Request): @@ -274,25 +283,25 @@ def continue_request(self, request: Request): else: thread_ids = [request("threadId", int)] single_thread = request("singleThread", False) - tracing.resume(thread_ids if single_thread else None) + self._tracer.resume(thread_ids if single_thread else None) return {} def stepIn_request(self, request: Request): # TODO: support "singleThread" and "granularity" thread_id = request("threadId", int) - tracing.step_in(thread_id) + self._tracer.step_in(thread_id) return {} def stepOut_request(self, request: Request): # TODO: support "singleThread" and "granularity" thread_id = request("threadId", int) - tracing.step_out(thread_id) + self._tracer.step_out(thread_id) return {} def next_request(self, request: Request): # TODO: support "singleThread" and "granularity" thread_id = request("threadId", int) - tracing.step_over(thread_id) + self._tracer.step_over(thread_id) return {} def scopes_request(self, request: Request): @@ -316,18 +325,18 @@ def evaluate_request(self, request: Request): return {"result": var.repr, "variablesReference": var.id} def disconnect_request(self, request: Request): - tracing.Breakpoint.clear() - tracing.abandon_step() - tracing.resume() + Breakpoint.clear() + self._tracer.abandon_step() + self._tracer.resume() return {} def terminate_request(self, request: Request): - tracing.Breakpoint.clear() - tracing.abandon_step() - tracing.resume() + Breakpoint.clear() + self._tracer.abandon_step() + self._tracer.resume() return {} def disconnect(self): - tracing.resume() + self._tracer.resume() self.connected_event.clear() return {} diff --git a/src/debugpy/server/eval.py b/src/debugpy/server/eval.py index c9fe4d1d6..cfa1e8db6 100644 --- a/src/debugpy/server/eval.py +++ b/src/debugpy/server/eval.py @@ -2,29 +2,28 @@ # Licensed under the MIT License. See LICENSE in the project root # for license information. +import debugpy import threading - from collections.abc import Iterable +from debugpy.server.inspect import inspect from types import FrameType from typing import ClassVar, Dict, Literal, Self -from debugpy.server import tracing -from debugpy.server.inspect import inspect - -ScopeKind = Literal["global", "nonlocal", "local"] +type ScopeKind = Literal["global", "nonlocal", "local"] +type StackFrame = "debugpy.server.tracing.StackFrame" _lock = threading.RLock() class VariableContainer: - frame: "tracing.StackFrame" + frame: StackFrame id: int _last_id: ClassVar[int] = 0 _all: ClassVar[Dict[int, "VariableContainer"]] = {} - def __init__(self, frame: "tracing.StackFrame"): + def __init__(self, frame: StackFrame): self.frame = frame with _lock: VariableContainer._last_id += 1 @@ -46,7 +45,7 @@ def variables(self) -> Iterable["Variable"]: raise NotImplementedError @classmethod - def invalidate(self, *frames: Iterable["tracing.StackFrame"]) -> None: + def invalidate(self, *frames: Iterable[StackFrame]) -> None: with _lock: ids = [ id @@ -61,7 +60,7 @@ class Scope(VariableContainer): frame: FrameType kind: ScopeKind - def __init__(self, frame: "tracing.StackFrame", kind: ScopeKind): + def __init__(self, frame: StackFrame, kind: ScopeKind): super().__init__(frame) self.kind = kind @@ -92,7 +91,7 @@ class Variable(VariableContainer): value: object # TODO: evaluateName, memoryReference, presentationHint - def __init__(self, frame: "tracing.StackFrame", name: str, value: object): + def __init__(self, frame: StackFrame, name: str, value: object): super().__init__(frame) self.name = name self.value = value diff --git a/src/debugpy/server/tracing.py b/src/debugpy/server/tracing.py deleted file mode 100644 index 007f5b781..000000000 --- a/src/debugpy/server/tracing.py +++ /dev/null @@ -1,540 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See LICENSE in the project root -# for license information. - -import inspect -import sys -import threading -import traceback - -from contextlib import contextmanager -from collections import defaultdict -from dataclasses import dataclass, field -from pathlib import Path -from sys import monitoring -from types import CodeType, FrameType -from typing import ClassVar, Dict, Iterable, List, Literal, Union - -from debugpy.server import adapter -from debugpy.server.eval import Scope, VariableContainer - -# Shared for all global state pertaining to breakpoints and stepping. -_cvar = threading.Condition() - -# IDs of threads that are currently pausing or paused. -_pause_ids = set() - -_steps = {} - - -@contextmanager -def cvar(who): - #print(end=f"ACQUIRING {who}\n") - with _cvar: - #print(end=f"ACQUIRED {who}\n") - yield - #print(end=f"RELEASING {who}\n") - #print(end=f"RELEASED {who}\n") - - -@dataclass -class Thread: - id: int = field(init=False) - thread: threading.Thread - - def __post_init__(self): - # TODO: map 32-bit DAP thread IDs to (potentially) 64-bit Python thread IDs. - # Otherwise, large thread IDs (common on Linux) will be truncated when they are serialized as JSON. - self.id = self.thread.ident - - def __getstate__(self): - return { - "id": self.id, - "name": self.thread.name, - } - - @property - def is_traced(self): - return not getattr(self.thread, "pydev_do_not_trace", False) - - @property - def name(self): - return self.thread.name - - @classmethod - def enumerate(self) -> List["Thread"]: - return [ - thread - for t in threading.enumerate() - for thread in [Thread(t)] - if thread.is_traced - ] - - @classmethod - def get(self, id: int) -> Union["Thread", None]: - for thread in self.enumerate(): - if thread.id == id: - return thread - return None - - def stack_trace(self) -> Iterable["StackFrame"]: - try: - (fobj,) = (fobj for (id, fobj) in sys._current_frames().items() if id == self.id) - except ValueError: - raise ValueError(f"Can't get frames for inactive Thread({self.id})") - for fobj, _ in traceback.walk_stack(fobj): - frame = StackFrame.from_frame_object(self, fobj) - if not frame.is_internal(): - yield frame - - -@dataclass -class StackFrame: - thread: Thread - frame_object: FrameType - - id: int = field(init=False) - _path: Path = field(init=False) - _scopes: List[Scope] = field(init=False, default=None) - - _last_id: ClassVar[int] = 0 - _all: ClassVar[Dict[int, "StackFrame"]] = {} - - def __post_init__(self): - StackFrame._last_id += 1 - self.id = StackFrame._last_id - self._path = None - self._all[self.id] = self - - def __getstate__(self): - return { - "id": self.id, - "name": self.frame_object.f_code.co_name, - "source": { - # TODO: use "sourceReference" when path isn't available (e.g. decompiled code) - "path": str(self.path()), - }, - "line": self.frame_object.f_lineno, - "column": 1, # TODO - # TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference" - } - - @property - def line(self) -> int: - return self.frame_object.f_lineno - - def path(self) -> Path: - if self._path is None: - path = Path(self.frame_object.f_code.co_filename) - try: - path = path.resolve() - except (OSError, RuntimeError): - pass - # No need to sync this. - self._path = path - return self._path - - def is_internal(self) -> bool: - # TODO: filter internal frames properly - parts = self.path().parts - internals = ["debugpy", "threading"] - return any(part.startswith(s) for s in internals for part in parts) - - @classmethod - def get(self, id: int) -> "StackFrame": - return self._all.get(id, None) - - @classmethod - def from_frame_object(self, thread: Thread, frame_object: FrameType) -> "StackFrame": - for frame in self._all.values(): - if frame.thread.id == thread.id and frame.frame_object is frame_object: - return frame - return StackFrame(thread, frame_object) - - def scopes(self) -> List[Scope]: - if self._scopes is None: - self._scopes = [ - Scope(self.frame_object, "local"), - Scope(self.frame_object, "global"), - ] - return self._scopes - - @classmethod - def invalidate(self, thread_id: int): - frames = [frame for frame in self._all.values() if frame.thread.id == thread_id] - VariableContainer.invalidate(*frames) - - -@dataclass -class Step: - step: Literal["in", "out", "over"] - origin: FrameType = None - origin_line: int = None - - -@dataclass -class Breakpoint: - path: Path - line: int - is_enabled: bool = True - - id: int = field(init=False) - - _last_id: ClassVar[int] = 0 - - _all: ClassVar[Dict[int, "Breakpoint"]] = {} - - _at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict( - lambda: defaultdict(lambda: []) - ) - - def __post_init__(self): - Breakpoint._last_id += 1 - self.id = Breakpoint._last_id - with cvar(1): - self._all[self.id] = self - self._at[self.path][self.line].append(self) - _cvar.notify_all() - - def __getstate__(self): - return { - "line": self.line, - "verified": True, # TODO - } - - def is_hit(self, frame: StackFrame): - with cvar(2): - # Check path last since path resolution is potentially expensive. - return ( - self.is_enabled - and frame.line == self.line - and frame.path() == self.path - ) - - @classmethod - def at(self, path: str, line: int) -> List["Breakpoint"]: - with cvar(3): - return self._at[path][line] - - @classmethod - def clear(self, paths: Iterable[str] = None): - #print("clear-bp", paths) - if paths is not None: - paths = [Path(path).resolve() for path in paths] - with cvar(4): - if paths is None: - paths = list(self._at.keys()) - for path in paths: - bps_in = self._at.pop(path, {}).values() - for bps_at in bps_in: - for bp in bps_at: - del self._all[bp.id] - _cvar.notify_all() - monitoring.restart_events() - - @classmethod - def set(self, path: str, line: int) -> "Breakpoint": - try: - path = Path(path).resolve() - except (OSError, RuntimeError): - pass - #print("set-bp", path, line) - bp = Breakpoint(path, line) - monitoring.restart_events() - return bp - - def enable(self, is_enabled: bool): - with cvar(5): - self.is_enabled = is_enabled - _cvar.notify_all() - - -def start(): - for thread in Thread.enumerate(): - adapter().channel.send_event( - "thread", - { - "reason": "started", - "threadId": thread.id, - "name": thread.name, - }, - ) - - monitoring.use_tool_id(monitoring.DEBUGGER_ID, "debugpy") - monitoring.set_events( - monitoring.DEBUGGER_ID, - ( - monitoring.events.LINE - | monitoring.events.PY_START - | monitoring.events.PY_RETURN - | monitoring.events.PY_RESUME - | monitoring.events.PY_YIELD - | monitoring.events.PY_THROW - | monitoring.events.PY_UNWIND - | monitoring.events.RAISE - | monitoring.events.RERAISE - | monitoring.events.EXCEPTION_HANDLED - ), - ) - - trace_funcs = { - monitoring.events.LINE: _trace_line, - monitoring.events.PY_START: _trace_py_start, - monitoring.events.PY_RESUME: _trace_py_resume, - monitoring.events.PY_RETURN: _trace_py_return, - monitoring.events.PY_YIELD: _trace_py_yield, - monitoring.events.PY_THROW: _trace_py_throw, - monitoring.events.PY_UNWIND: _trace_py_unwind, - monitoring.events.RAISE: _trace_raise, - monitoring.events.RERAISE: _trace_reraise, - monitoring.events.EXCEPTION_HANDLED: _trace_exception_handled, - } - for event, func in trace_funcs.items(): - monitoring.register_callback(monitoring.DEBUGGER_ID, event, func) - - -def pause(thread_ids: List[int] = None): - #print(f"PAUSE {thread_ids=}") - if thread_ids is None: - thread_ids = [thread.id for thread in Thread.enumerate()] - - # TODO: handle race between the above and new threads starting when doing pause-the-world. - with cvar(6): - _pause_ids.update(thread_ids) - _cvar.notify_all() - monitoring.restart_events() - - -def resume(thread_ids: List[int] = None): - #print(f"RESUME {thread_ids=}") - with cvar(7): - if thread_ids is None: - _pause_ids.clear() - else: - _pause_ids.difference_update(thread_ids) - _cvar.notify_all() - monitoring.restart_events() - - -def abandon_step(thread_ids: List[int] = None): - #print(f"ABANDON_STEP {thread_ids=}") - with cvar(8): - if thread_ids is None: - thread_ids = [thread.id for thread in Thread.enumerate()] - for thread_id in thread_ids: - _steps.pop(thread_id, None) - _cvar.notify_all() - monitoring.restart_events() - - -def step_in(thread_id: int): - with cvar(9): - _steps[thread_id] = Step("in") - _pause_ids.clear() - _cvar.notify_all() - monitoring.restart_events() - - -def step_out(thread_id: int): - with cvar(10): - _steps[thread_id] = Step("out") - _pause_ids.clear() - _cvar.notify_all() - monitoring.restart_events() - - -def step_over(thread_id: int): - with cvar(11): - _steps[thread_id] = Step("over") - _pause_ids.clear() - _cvar.notify_all() - monitoring.restart_events() - - -# On shutdown, modules go away (become None), but _trace_line is still invoked. -DISABLE = monitoring.DISABLE - - -def _stop(frame_obj: FrameType, reason: str, hit_breakpoints: Iterable[Breakpoint] = ()): - thread_id = threading.get_ident() - #print(f"STOP {thread_id=}, {reason=}, {hit_breakpoints=}") - with cvar(12): - if thread_id not in _pause_ids: - #print("STOP: not paused") - return - - #print("SENDING...") - adapter().channel.send_event( - "stopped", - { - "reason": reason, - "threadId": threading.get_ident(), - "allThreadsStopped": False, # TODO - "hitBreakpointIds": [bp.id for bp in hit_breakpoints], - }, - ) - #print("SENT!") - - #print(f"BLOCK {thread_id=}") - while thread_id in _pause_ids: - _cvar.wait() - #print(f"UNBLOCK {thread_id=}") - - step = _steps.get(thread_id, None) - if step is not None and step.origin is None: - step.origin = frame_obj - step.origin_line = frame_obj.f_lineno - - -def _trace_line(code: CodeType, line_number: int): - if monitoring is None: - return DISABLE - - thread = Thread(threading.current_thread()) - if not thread.is_traced: - return DISABLE - - stop_reason = None - with cvar(13): - if thread.id in _pause_ids: - stop_reason = "pause" - - step = _steps.get(thread.id, None) - is_stepping = step is not None and step.origin is not None - if is_stepping: - # TODO: use CALL/RETURN/PY_RETURN to track these more efficiently. - frame_obj = inspect.currentframe().f_back - step_finished = False - if step.step == "in": - if frame_obj is not step.origin or line_number != step.origin_line: - step_finished = True - elif step.step == "out": - step_finished = True - while frame_obj is not None: - if frame_obj is step.origin: - step_finished = False - break - frame_obj = frame_obj.f_back - elif step.step == "over": - step_finished = True - while frame_obj is not None: - if frame_obj is step.origin and frame_obj.f_lineno == step.origin_line: - step_finished = False - break - frame_obj = frame_obj.f_back - else: - raise ValueError(f"Unknown step type: {step.step}") - - if step_finished: - del _steps[thread.id] - _pause_ids.add(thread.id) - _cvar.notify_all() - stop_reason = "step" - - if stop_reason is not None: - return _stop(inspect.currentframe().f_back, stop_reason) - - path = Path(code.co_filename) - try: - path = path.resolve() - except (OSError, RuntimeError): - pass - # print(f"TRACE_LINE {thread_id=}, {path=}, {line_number=}") - - bps = Breakpoint.at(path, line_number) - if not bps and not is_stepping: - return DISABLE - - frame = StackFrame(thread, inspect.currentframe().f_back) - try: - bps_hit = [bp for bp in bps if bp.is_hit(frame)] - if bps_hit: - #print("!BREAKPOINT HIT!") - with cvar(14): - _pause_ids.add(thread.id) - _cvar.notify_all() - return _stop(frame.frame_object, "breakpoint", bps_hit) - finally: - del frame - - -def _trace_py_start(code: CodeType, ip: int): - if threading.current_thread() is not threading.main_thread(): - return - #print(f"TRACE_PY_START {code=}, {ip=}") - - -def _trace_py_resume(code: CodeType, ip: int): - if threading.current_thread() is not threading.main_thread(): - return - #print(f"TRACE_PY_RESUME {code=}, {ip=}") - - -def _trace_py_return(code: CodeType, ip: int, retval: object): - if threading.current_thread() is not threading.main_thread(): - return - try: - retval = repr(retval) - except: - retval = "" - #print(f"TRACE_PY_RETURN {code=}, {ip=}, {retval=}") - - -def _trace_py_yield(code: CodeType, ip: int, retval: object): - if threading.current_thread() is not threading.main_thread(): - return - try: - retval = repr(retval) - except: - retval = "" - #print(f"TRACE_PY_YIELD {code=}, {ip=}, {retval=}") - - -def _trace_py_throw(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_PY_THROW {code=}, {ip=}, {exc=}") - - -def _trace_py_unwind(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_PY_UNWIND {code=}, {ip=}, {exc=}") - - -def _trace_raise(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_RAISE {code=}, {ip=}, {exc=}") - - -def _trace_reraise(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_RERAISE {code=}, {ip=}, {exc=}") - - -def _trace_exception_handled(code: CodeType, ip: int, exc: BaseException): - if threading.current_thread() is not threading.main_thread(): - return - try: - exc = repr(exc) - except: - exc = "" - #print(f"TRACE_EXCEPTION_HANDLED {code=}, {ip=}, {exc=}") diff --git a/src/debugpy/server/tracing/__init__.py b/src/debugpy/server/tracing/__init__.py new file mode 100644 index 000000000..3195d8a2f --- /dev/null +++ b/src/debugpy/server/tracing/__init__.py @@ -0,0 +1,531 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +import re +import sys +import threading +import traceback +from collections import defaultdict +from dataclasses import dataclass +from debugpy import server +from debugpy.common import log +from debugpy.server.eval import Scope, VariableContainer +from pathlib import Path +from sys import monitoring +from types import CodeType, FrameType +from typing import Callable, ClassVar, Dict, Iterable, List, Literal, Union + +# Shared for all global state pertaining to breakpoints and stepping. +_cvar = threading.Condition() + + +class Thread: + """ + Represents a DAP Thread object. Instances must never be created directly; + use Thread.from_python_thread() instead. + """ + + id: int + """DAP ID of this thread. Distinct from thread.ident.""" + + python_thread: threading.Thread + """The Python thread object this DAP Thread represents.""" + + is_known_to_adapter: bool + """ + Whether this thread has been reported to the adapter via the + DAP "thread" event with "reason":"started". + """ + + _last_id = 0 + _all: ClassVar[Dict[int, "Thread"]] = {} + + def __init__(self, python_thread): + """ + Create a new Thread object for the given thread. Do not invoke directly; + use Thread.get() instead. + """ + self.python_thread = python_thread + self.is_known_to_adapter = False + + with _cvar: + # Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit + # floats by most DAP clients. However, OS thread IDs can be large 64-bit integers + # on some platforms. To avoid loss of precision, we map all thread IDs to 32-bit + # signed integers; if the original ID fits, we use it as is, otherwise we use a + # generated negative ID that is guaranteed to fit. + self.id = self.python_thread.ident + if self.id != float(self.id): + Thread._last_id -= 1 + self.id = Thread._last_id + self._all[self.id] = self + + log.info( + f"DAP Thread(id={self.id}) created for Python Thread(ident={self.python_thread.ident})" + ) + + def __getstate__(self): + return { + "id": self.id, + "name": self.name, + } + + @property + def is_debugpy_thread(self): + return getattr(self.python_thread, "is_debugpy_thread", False) + + @property + def is_traced(self): + return not self.is_debugpy_thread + + @property + def name(self): + return self.python_thread.name + + @classmethod + def from_python_thread(self, python_thread: threading.Thread = None) -> "Thread": + """ + Returns the DAP Thread object corresponding to the given Python thread, or for + the current Python thread if None, creating it and reporting it to adapter if + necessary. + """ + if python_thread is None: + python_thread = threading.current_thread() + with _cvar: + for thread in self._all.values(): + if thread.python_thread is python_thread: + break + else: + thread = Thread(python_thread) + thread.make_known_to_adapter() + return thread + + @classmethod + def get(self, id: int) -> Union["Thread", None]: + """ + Finds a thread by its DAP ID. Returns None if ID is unknown. + """ + with _cvar: + return self._all.get(id, None) + + @classmethod + def enumerate(self) -> List["Thread"]: + """ + Returns a list of all running threads in this process. + """ + return [ + thread + for python_thread in threading.enumerate() + for thread in [Thread.from_python_thread(python_thread)] + if thread.is_traced + ] + + def make_known_to_adapter(self): + """ + If adapter is connected to this process, reports this thread to it via DAP + "thread" event with "reason":"started" if it hasn't been reported already. + Returns True if thread is now known to the adapter, and False if there was + no adapter to report it to. + """ + with _cvar: + if not self.is_traced: + return False + if self.is_known_to_adapter: + return True + adapter = server.adapter() + if adapter is None: + return False + adapter.channel.send_event( + "thread", + { + "reason": "started", + "threadId": self.id, + "name": self.name, + }, + ) + self.is_known_to_adapter = True + return True + + def stack_trace(self) -> Iterable["StackFrame"]: + """ + Returns an iterable of StackFrame objects for the current stack of this thread, + starting with the topmost frame. + """ + try: + (fobj,) = ( + fobj for (id, fobj) in sys._current_frames().items() if id == self.id + ) + except ValueError: + raise ValueError(f"Can't get frames for inactive Thread({self.id})") + for fobj, _ in traceback.walk_stack(fobj): + frame = StackFrame.from_frame_object(self, fobj) + if not frame.is_internal(): + yield frame + + +class StackFrame: + """ + Represents a DAP StackFrame object. Instances must never be created directly; + use StackFrame.from_frame_object() instead. + """ + + thread: Thread + frame_object: FrameType + + id: int + _path: Path + _scopes: List[Scope] + + _last_id = 0 + _all: ClassVar[Dict[int, "StackFrame"]] = {} + + def __init__(self, thread: Thread, frame_object: FrameType): + """ + Create a new StackFrame object for the given thread and frame object. Do not + invoke directly; use StackFrame.from_frame_object() instead. + """ + StackFrame._last_id += 1 + self.id = StackFrame._last_id + self.thread = thread + self.frame_object = frame_object + self._path = None + self._scopes = None + self._all[self.id] = self + + def __getstate__(self): + return { + "id": self.id, + "name": self.frame_object.f_code.co_name, + "source": { + # TODO: use "sourceReference" when path isn't available (e.g. decompiled code) + "path": str(self.path()), + }, + "line": self.frame_object.f_lineno, + "column": 1, # TODO + # TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference" + } + + @property + def line(self) -> int: + return self.frame_object.f_lineno + + def path(self) -> Path: + if self._path is None: + path = Path(self.frame_object.f_code.co_filename) + try: + path = path.resolve() + except (OSError, RuntimeError): + pass + # No need to sync this since all instances are equivalent. + self._path = path + return self._path + + def is_internal(self) -> bool: + # TODO: filter internal frames properly + parts = self.path().parts + internals = ["debugpy", "threading"] + return any(part.startswith(s) for s in internals for part in parts) + + @classmethod + def from_frame_object( + self, thread: Thread, frame_object: FrameType + ) -> "StackFrame": + for frame in self._all.values(): + if frame.thread is thread and frame.frame_object is frame_object: + return frame + return StackFrame(thread, frame_object) + + @classmethod + def get(self, id: int) -> "StackFrame": + return self._all.get(id, None) + + def scopes(self) -> List[Scope]: + if self._scopes is None: + self._scopes = [ + Scope(self.frame_object, "local"), + Scope(self.frame_object, "global"), + ] + return self._scopes + + @classmethod + def invalidate(self, thread_id: int): + frames = [frame for frame in self._all.values() if frame.thread.id == thread_id] + VariableContainer.invalidate(*frames) + + +@dataclass +class Step: + step: Literal["in", "out", "over"] + origin: FrameType = None + origin_line: int = None + + +class Condition: + """ + Expression that must be true for the breakpoint to be triggered. + """ + + expression: str + """Python expression that must evaluate to True for the breakpoint to be triggered.""" + + _code: CodeType + + def __init__(self, breakpoint: "Breakpoint", expression: str): + self.expression = expression + self._code = compile( + expression, f"breakpoint-{breakpoint.id}-condition", "eval" + ) + + def test(self, frame: StackFrame) -> bool: + """ + Returns True if the breakpoint should be triggered in the specified frame. + """ + try: + return bool( + eval( + self._code, + frame.frame_object.f_globals, + frame.frame_object.f_locals, + ) + ) + except: + log.exception( + f"Exception while evaluating breakpoint condition: {self.expression}" + ) + return False + + +class HitCondition: + """ + Hit count expression that must be True for the breakpoint to be triggered. + + Must have the format `[]`, where is a positive integer literal, + and is one of `==` `>` `>=` `<` `<=` `%`, defaulting to `==` if unspecified. + + Examples: + 5: break on the 5th hit + ==5: ditto + >5: break on every hit after the 5th + >=5: break on the 5th hit and thereafter + %5: break on every 5th hit + """ + + _OPERATORS = { + "==": lambda expected_count, count: count == expected_count, + ">": lambda expected_count, count: count > expected_count, + ">=": lambda expected_count, count: count >= expected_count, + "<": lambda expected_count, count: count < expected_count, + "<=": lambda expected_count, count: count <= expected_count, + "%": lambda expected_count, count: count % expected_count == 0, + } + + hit_condition: str + _count: int + _operator: Callable[[int, int], bool] + + def __init__(self, hit_condition: str): + self.hit_condition = hit_condition + m = re.match(r"([<>=]+)?(\d+)", hit_condition) + if not m: + raise ValueError(f"Invalid hit condition: {hit_condition}") + self._count = int(m.group(2)) + try: + op = self._OPERATORS[m.group(1) or "=="] + except KeyError: + raise ValueError(f"Invalid hit condition operator: {op}") + self.test = lambda count: op(self._count, count) + + def test(self, count: int) -> bool: + """ + Returns True if the breakpoint should be triggered on the given hit count. + """ + # __init__ replaces this method with an actual implementation from _OPERATORS + # when it parses the condition. + raise NotImplementedError + + +class LogMessage: + """ + A message with spliced expressions, to be logged when a breakpoint is triggered. + """ + + message: str + """The message to be logged. May contain expressions in curly braces.""" + + _code: CodeType + """Compiled code object for the f-string corresponding to the message.""" + + def __init__(self, breakpoint: "Breakpoint", message: str): + self.message = message + f_string = "f" + repr(message) + self._code = compile(f_string, f"breakpoint-{breakpoint.id}-logMessage", "eval") + + def format(self, frame: StackFrame) -> str: + """ + Formats the message using the specified frame's locals and globals. + """ + try: + return eval( + self._code, frame.frame_object.f_globals, frame.frame_object.f_locals + ) + except: + log.exception( + f"Exception while formatting breakpoint log message: {self.message}" + ) + return self.message + + +class Breakpoint: + """ + Represents a DAP Breakpoint. + """ + + id: int + path: Path + line: int + is_enabled: bool + + condition: Condition | None + + hit_condition: HitCondition | None + + log_message: LogMessage | None + + hit_count: int + """Number of times this breakpoint has been hit.""" + + _last_id = 0 + + _all: ClassVar[Dict[int, "Breakpoint"]] = {} + + _at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict( + lambda: defaultdict(lambda: []) + ) + + def __init__( + self, path, line, *, condition=None, hit_condition=None, log_message=None + ): + with _cvar: + Breakpoint._last_id += 1 + self.id = Breakpoint._last_id + + self.path = path + self.line = line + self.is_enabled = True + self.condition = Condition(self, condition) if condition else None + self.hit_condition = HitCondition(hit_condition) if hit_condition else None + self.log_message = LogMessage(self, log_message) if log_message else None + self.hit_count = 0 + + with _cvar: + self._all[self.id] = self + self._at[self.path][self.line].append(self) + _cvar.notify_all() + + def __getstate__(self): + return { + "line": self.line, + "verified": True, # TODO + } + + @classmethod + def at(self, path: str, line: int) -> List["Breakpoint"]: + """ + Returns a list of all breakpoints at the specified location. + """ + with _cvar: + return self._at[path][line] + + @classmethod + def clear(self, paths: Iterable[str] = None): + """ + Removes all breakpoints in the specified files, or all files if None. + """ + if paths is not None: + paths = [Path(path).resolve() for path in paths] + with _cvar: + if paths is None: + paths = list(self._at.keys()) + for path in paths: + bps_in = self._at.pop(path, {}).values() + for bps_at in bps_in: + for bp in bps_at: + del self._all[bp.id] + _cvar.notify_all() + monitoring.restart_events() + + @classmethod + def set( + self, + path: str, + line: int, + *, + condition=None, + hit_condition=None, + log_message=None, + ) -> "Breakpoint": + """ + Creates a new breakpoint at the specified location. + """ + try: + path = Path(path).resolve() + except (OSError, RuntimeError): + pass + bp = Breakpoint( + path, + line, + condition=condition, + hit_condition=hit_condition, + log_message=log_message, + ) + monitoring.restart_events() + return bp + + def enable(self, is_enabled: bool): + """ + Enables or disables this breakpoint. + """ + with _cvar: + self.is_enabled = is_enabled + _cvar.notify_all() + + def is_triggered(self, frame: StackFrame) -> bool | str: + """ + Determines whether this breakpoint is triggered by the current line in the + specified stack frame, and updates its hit count. + + If the breakpoint is triggered, returns a truthy value; if the breakpoint has + a log message, it is formatted and returned, otherwise True is returned. + """ + with _cvar: + # Check path last since path resolution is potentially expensive. + if ( + not self.is_enabled + or frame.line != self.line + or frame.path() != self.path + ): + return False + + # Hit count must be updated even if conditions are false and execution + # isn't stopped. + self.hit_count += 1 + + # Check hit_condition first since it is faster than checking condition. + if self.hit_condition is not None and not self.hit_condition.test( + self.hit_count + ): + return False + if self.condition is not None and not self.condition.test(frame): + return False + + # If this is a logpoint, return the formatted message instead of True. + if self.log_message is not None: + return self.log_message.format(frame) + + return True + + +# sys.monitoring callbacks are defined in a separate submodule to enable tighter +# control over their use of global state; see comment there for details. +from .tracer import Tracer # noqa diff --git a/src/debugpy/server/tracing/tracer.py b/src/debugpy/server/tracing/tracer.py new file mode 100644 index 000000000..71f2ca509 --- /dev/null +++ b/src/debugpy/server/tracing/tracer.py @@ -0,0 +1,417 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +# Once callbacks are registered they are invoked even during finalization when the +# Python is shutting down. Thus, trace_* methods, and any other methods that they +# invoke, must not use any globals from this or other modules (including globals +# that represent imported modules or defined classes!) until it checks that they +# are present, or preload them into class or instance attributes in advance. +# To facilitate this, Tracer is defined in a separate submodule which should not +# contain ANY top-level imports other than typing nor definitions other than the +# class itself. All other imports must be done in class scope and then referred to +# from methods via self. + +from types import CodeType, FrameType +from typing import Iterable + + +class Log: + """ + Safe logging for Tracer. Delegates to debugpy.common.log, but only when it is + safe to do so (i.e. not during finalization). + """ + + from debugpy.common import log + + def __init__(self): + import atexit + + def nop(*args, **kwargs): + pass + + @atexit.register + def disable(): + self.debug = self.info = self.warning = self.error = self.exception = nop + + def debug(self, *args, **kwargs): + # self.log.debug("{0}", *args, **kwargs) + # print(*args) + pass + + def info(self, *args, **kwargs): + self.log.info("{0}", *args, **kwargs) + + def warning(self, *args, **kwargs): + self.log.warning("{0}", *args, **kwargs) + + def error(self, *args, **kwargs): + self.log.error("{0}", *args, **kwargs) + + def exception(self, *args, **kwargs): + self.log.exception("{0}", *args, **kwargs) + + +class Tracer: + """ + Singleton that manages sys.monitoring callbacks for this process. + """ + + import inspect + import threading + from debugpy import server + from debugpy.server.tracing import Breakpoint, Step, Thread, StackFrame, _cvar + from pathlib import Path + from sys import monitoring + + instance: "Tracer" + + log: Log + + _pause_ids = set() + """IDs of threads that are currently pausing or paused.""" + + _steps = {} + """Ongoing steps, keyed by thread ID.""" + + def __init__(self): + self.log = Log() + + @property + def adapter(self): + return self.server.adapter() + + def start(self): + """ + Register sys.monitoring tracing callbacks. + """ + + self.log.info("Registering sys.monitoring tracing callbacks...") + + self.monitoring.use_tool_id(self.monitoring.DEBUGGER_ID, "debugpy") + self.monitoring.set_events( + self.monitoring.DEBUGGER_ID, + ( + self.monitoring.events.LINE + | self.monitoring.events.PY_START + | self.monitoring.events.PY_RETURN + | self.monitoring.events.PY_RESUME + | self.monitoring.events.PY_YIELD + | self.monitoring.events.PY_THROW + | self.monitoring.events.PY_UNWIND + | self.monitoring.events.RAISE + | self.monitoring.events.RERAISE + | self.monitoring.events.EXCEPTION_HANDLED + ), + ) + trace_funcs = { + self.monitoring.events.LINE: self._trace_line, + self.monitoring.events.PY_START: self._trace_py_start, + self.monitoring.events.PY_RESUME: self._trace_py_resume, + self.monitoring.events.PY_RETURN: self._trace_py_return, + self.monitoring.events.PY_YIELD: self._trace_py_yield, + self.monitoring.events.PY_THROW: self._trace_py_throw, + self.monitoring.events.PY_UNWIND: self._trace_py_unwind, + self.monitoring.events.RAISE: self._trace_raise, + self.monitoring.events.RERAISE: self._trace_reraise, + self.monitoring.events.EXCEPTION_HANDLED: self._trace_exception_handled, + } + for event, func in trace_funcs.items(): + self.monitoring.register_callback(self.monitoring.DEBUGGER_ID, event, func) + + self.log.info("sys.monitoring tracing callbacks registered.") + + def pause(self, thread_ids: Iterable[int] = None): + """ + Pause the specified threads, or all threads if thread_ids is None. + """ + if thread_ids is None: + # Pausing is async, so additional threads may be spawned even as we are + # trying to pause the ones we currently know about; iterate until all + # known threads are paused, and no new threads appear. + while True: + thread_ids = {thread.id for thread in self.Thread.enumerate()} + if self._pause_ids.keys() == thread_ids: + return + self.pause(thread_ids) + else: + self.log.info(f"Pausing threads: {thread_ids}") + with self._cvar: + self._pause_ids.update(thread_ids) + self._cvar.notify_all() + self.monitoring.restart_events() + + def resume(self, thread_ids: Iterable[int] = None): + """ + Resume the specified threads, or all threads if thread_ids is None. + """ + with self._cvar: + if thread_ids is None: + self.log.info("Resuming all threads.") + self._pause_ids.clear() + else: + self.log.info(f"Resuming threads: {thread_ids}") + self._pause_ids.difference_update(thread_ids) + self._cvar.notify_all() + self.monitoring.restart_events() + + def abandon_step(self, thread_ids: Iterable[int] = None): + """ + Abandon any ongoing steps that are in progress on the specified threads + (all threads if thread_ids is None). + """ + with self._cvar: + if thread_ids is None: + thread_ids = [thread.id for thread in self.Thread.enumerate()] + for thread_id in thread_ids: + step = self._steps.pop(thread_id, None) + if step is not None: + self.log.info(f"Abandoned step-{step.step} on {thread_id}.") + self._cvar.notify_all() + self.monitoring.restart_events() + + def step_in(self, thread_id: int): + """ + Step into the next statement executed by the specified thread. + """ + self.log.info(f"Step in on thread {thread_id}.") + with self._cvar: + self._steps[thread_id] = self.Step("in") + self._pause_ids.clear() + self._cvar.notify_all() + self.monitoring.restart_events() + + def step_out(self, thread_id: int): + """ + Step out of the current function executed by the specified thread. + """ + self.log.info(f"Step out on thread {thread_id}.") + with self._cvar: + self._steps[thread_id] = self.Step("out") + self._pause_ids.clear() + self._cvar.notify_all() + self.monitoring.restart_events() + + def step_over(self, thread_id: int): + self.log.info(f"Step over on thread {thread_id}.") + """ + Step over the next statement executed by the specified thread. + """ + with self._cvar: + self._steps[thread_id] = self.Step("over") + self._pause_ids.clear() + self._cvar.notify_all() + self.monitoring.restart_events() + + def _stop( + self, + frame_obj: FrameType, + reason: str, + hit_breakpoints: Iterable[Breakpoint] = (), + ): + thread = self.Thread.from_python_thread() + self.log.info(f"Pausing thread {thread.id}: {reason}.") + + with self._cvar: + if thread.id not in self._pause_ids: + return + + self.adapter.channel.send_event( + "stopped", + { + "reason": reason, + "threadId": thread.id, + "allThreadsStopped": False, # TODO + "hitBreakpointIds": [bp.id for bp in hit_breakpoints], + }, + ) + + self.log.info(f"Thread {thread.id} paused.") + while thread.id in self._pause_ids: + self._cvar.wait() + self.log.info(f"Thread {thread.id} unpaused.") + + step = self._steps.get(thread.id, None) + if step is not None and step.origin is None: + step.origin = frame_obj + step.origin_line = frame_obj.f_lineno + + def _trace_line(self, code: CodeType, line_number: int): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + + self.log.debug(f"sys.monitoring event: LINE({line_number}, {code})") + frame_obj = self.inspect.currentframe().f_back + + stop_reason = None + with self._cvar: + if thread.id in self._pause_ids: + stop_reason = "pause" + + step = self._steps.get(thread.id, None) + is_stepping = step is not None and step.origin is not None + if not is_stepping: + self.log.debug(f"No step in progress on thread {thread.id}.") + else: + self.log.debug( + f"Tracing step-{step.step} originating from {step.origin} on thread {thread.id}." + ) + + # TODO: use CALL/RETURN/PY_RETURN to track these more efficiently. + step_finished = False + if step.step == "in": + if frame_obj is not step.origin or line_number != step.origin_line: + step_finished = True + elif step.step == "out": + step_finished = True + while frame_obj is not None: + if frame_obj is step.origin: + step_finished = False + break + frame_obj = frame_obj.f_back + elif step.step == "over": + step_finished = True + while frame_obj is not None: + if ( + frame_obj is step.origin + and frame_obj.f_lineno == step.origin_line + ): + step_finished = False + break + frame_obj = frame_obj.f_back + else: + raise ValueError(f"Unknown step type: {step.step}") + + if step_finished: + self.log.info(f"Step-{step.step} finished on thread {thread.id}.") + del self._steps[thread.id] + self._pause_ids.add(thread.id) + self._cvar.notify_all() + stop_reason = "step" + + if stop_reason is not None: + # Even if this thread is pausing, any debugpy internal code on it should + # keep running until it returns to user code; otherwise, it may deadlock + # if it was holding e.g. a messaging lock. + print(frame_obj.f_globals.get("__name__")) + if not frame_obj.f_globals.get("__name__", "").startswith("debugpy"): + return self._stop(frame_obj, stop_reason) + + self.log.debug(f"Resolving path {code.co_filename!r}...") + path = self.Path(code.co_filename) + try: + path = path.resolve() + except (OSError, RuntimeError): + pass + self.log.debug(f"Path {code.co_filename!r} resolved to {path}.") + + bps = self.Breakpoint.at(path, line_number) + if not bps and not is_stepping: + self.log.debug(f"No breakpoints at {path}:{line_number}.") + return self.monitoring.DISABLE + self.log.debug(f"Considering breakpoints: {[bp.__getstate__() for bp in bps]}.") + + frame = self.StackFrame(thread, self.inspect.currentframe().f_back) + try: + stop_bps = [] + for bp in bps: + match bp.is_triggered(frame): + case str() as message: + # Triggered, has logMessage - print it but don't stop. + self.adapter.channel.send_event( + "output", + { + "category": "console", + "output": message, + "line": line_number, + "source": {"path": path}, + }, + ) + case triggered if triggered: + # Triggered, no logMessage - stop. + stop_bps.append(bp) + case _: + continue + + if stop_bps: + self.log.info( + f"Stack frame {frame} stopping at breakpoints {[bp.__getstate__() for bp in stop_bps]}." + ) + with self._cvar: + self._pause_ids.add(thread.id) + self._cvar.notify_all() + return self._stop(frame.frame_object, "breakpoint", stop_bps) + finally: + del frame + + def _trace_py_start(self, code: CodeType, ip: int): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_START({code}, {ip})") + + def _trace_py_resume(self, code: CodeType, ip: int): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_RESUME({code}, {ip})") + + def _trace_py_return(self, code: CodeType, ip: int, retval: object): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_RETURN({code}, {ip})") + # TODO: capture returned value to report it when client requests locals. + pass + + def _trace_py_yield(self, code: CodeType, ip: int, retval: object): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return self.monitoring.DISABLE + self.log.debug(f"sys.monitoring event: PY_YIELD({code}, {ip})") + # TODO: capture yielded value to report it when client requests locals. + pass + + def _trace_py_throw(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: PY_THROW({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_py_unwind(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: PY_UNWIND({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_raise(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: RAISE({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_reraise(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: RERAISE({code}, {ip}, {type(exc).__qualname__})" + ) + + def _trace_exception_handled(self, code: CodeType, ip: int, exc: BaseException): + thread = self.Thread.from_python_thread() + if not thread.is_traced: + return + self.log.debug( + f"sys.monitoring event: EXCEPTION_HANDLED({code}, {ip}, {type(exc).__qualname__})" + ) + + +Tracer.instance = Tracer()