Source code for simvx.ide.debug_session

"""Debug session — DAP client lifecycle, breakpoint sync, IDE menu actions."""

import logging
from typing import TYPE_CHECKING, Any

from simvx.core import Signal

from .dap.client import DAPClient

if TYPE_CHECKING:
    from .app import Root
    from .config import Config
    from .state import State

log = logging.getLogger(__name__)

[docs] class DebugSession: """Owns the active debug session and exposes IDE menu/run actions. Creates/destroys DAPClient instances per session, syncs breakpoints from State, caches thread/stack/scope/variable data for the UI, and provides the run/debug action handlers used by menu items and shortcuts. """ def __init__(self, state: State, config: Config, ide: Root | None = None): self._state = state self._config = config self._ide = ide # Optional: required only by IDE-action handlers (on_run_file etc.) self._client: DAPClient | None = None self._debug_state: str = "idle" # idle | running | stopped self._current_thread_id: int = 0 self._current_frame_id: int = 0 self._current_file: str = "" self._current_line: int = 0 self._launch_path: str = "" # Breakpoint conditions: path -> {line: condition_expr} self._conditions: dict[str, dict[int, str]] = {} # Signal forwarded from DAPClient when debugpy requests runInTerminal self.on_run_in_terminal = Signal() # Cached data for UI self._threads: list[dict[str, Any]] = [] self._stack_frames: list[dict[str, Any]] = [] self._scopes: list[dict[str, Any]] = [] self._variables: dict[int, list[dict[str, Any]]] = {} # Connect to state signals self._state.debug_state_changed.connect(self._on_debug_state_changed) self._state.breakpoint_toggled.connect(self._on_breakpoint_toggled) # -- IDE convenience accessors -------------------------------------------- @property def _bottom_tabs(self): return self._ide._bottom_tabs if self._ide else None @property def _terminal_panel(self): return self._ide._terminal_panel if self._ide else None # -- Public API ------------------------------------------------------------
[docs] def start_debug(self, path: str): """Start debugging a Python file.""" if self._client and self._client.running: self.stop_debug() self._clear_cache() env = self._config.get_env(self._state.project_root) python = self._config.get_python_command(self._state.project_root) self._client = DAPClient(self._state, env=env) self._client.on_initialized.connect(self._after_initialize) self._client.on_stopped.connect(self._on_stopped_event) self._client.on_terminated.connect(self._on_terminated) self._client.on_run_in_terminal.connect(lambda cmd, env: self.on_run_in_terminal.emit(cmd, env)) self._client.start(python=python) self._launch_path = path
[docs] def stop_debug(self): """Stop the current debug session.""" if self._client: self._client.stop() self._client = None self._debug_state = "idle" self._clear_cache()
[docs] def continue_execution(self): if self._client and self._debug_state == "stopped": self._client.continue_execution(self._current_thread_id)
[docs] def step_over(self): if self._client and self._debug_state == "stopped": self._client.step_over(self._current_thread_id)
[docs] def step_into(self): if self._client and self._debug_state == "stopped": self._client.step_into(self._current_thread_id)
[docs] def step_out(self): if self._client and self._debug_state == "stopped": self._client.step_out(self._current_thread_id)
[docs] def pause(self): if self._client and self._debug_state == "running": self._client.pause(self._current_thread_id)
[docs] def toggle_breakpoint(self, path: str, line: int): self._state.toggle_breakpoint(path, line)
[docs] def evaluate(self, expr: str, callback=None): """Evaluate expression in the current frame context. If *callback* is provided, the parsed result string (or dict on error) is forwarded to it instead of being emitted to debug_output. """ if self._client and self._debug_state == "stopped" and self._current_frame_id: if callback: def _wrap(msg, cb=callback): body = msg.get("body", {}) if msg.get("success"): cb(body.get("result", str(body))) else: cb({"error": msg.get("message", "eval failed"), "result": msg.get("message", "")}) self._client.evaluate(expr, self._current_frame_id, _wrap) else: self._client.evaluate(expr, self._current_frame_id, self._on_evaluate_response)
[docs] def set_breakpoint_condition(self, path: str, line: int, condition: str): """Set or clear a condition on a breakpoint.""" if condition: self._conditions.setdefault(path, {})[line] = condition else: conds = self._conditions.get(path, {}) conds.pop(line, None) if not conds: self._conditions.pop(path, None) # Re-sync breakpoints with adapter if live if self._client and self._client.running: lines = sorted(self._state.get_breakpoints(path)) self._set_breakpoints_with_conditions(path, lines)
[docs] def get_breakpoint_condition(self, path: str, line: int) -> str: """Get the condition for a breakpoint, or empty string.""" return self._conditions.get(path, {}).get(line, "")
[docs] def get_conditions_for_file(self, path: str) -> dict[int, str]: """Get all conditions for a file as {line: condition}.""" return dict(self._conditions.get(path, {}))
def _set_breakpoints_with_conditions(self, path: str, lines: list[int]): """Send setBreakpoints with optional conditions from our local store.""" conds = self._conditions.get(path, {}) has_conditions = any(conds.get(ln) for ln in lines) if not has_conditions: # Fast path: no conditions, use standard API self._client.set_breakpoints(path, lines) return # Build breakpoint list with conditions and send directly bp_list = [] for ln in lines: bp: dict[str, Any] = {"line": ln} cond = conds.get(ln, "") if cond: bp["condition"] = cond bp_list.append(bp) self._client._send_request( "setBreakpoints", { "source": {"path": path}, "breakpoints": bp_list, "sourceModified": False, }, self._client._on_set_breakpoints_response, )
[docs] def select_frame(self, frame_index: int): """Select a stack frame by index, refreshing scopes and variables.""" if frame_index < 0 or frame_index >= len(self._stack_frames): return frame = self._stack_frames[frame_index] self._current_frame_id = frame.get("id", 0) source = frame.get("source", {}) self._current_file = source.get("path", "") self._current_line = frame.get("line", 0) if self._current_file and self._current_line: self._state.goto_requested.emit(self._current_file, self._current_line, 0) self._fetch_scopes()
[docs] def fetch_variables(self, variables_ref: int, callback=None): """Fetch variables for a scope or nested object.""" if self._client: def on_vars(resp): body = resp.get("body", {}) var_list = body.get("variables", []) self._variables[variables_ref] = var_list if callback: callback(var_list) self._client.get_variables(variables_ref, on_vars)
# -- Properties ------------------------------------------------------------
[docs] @property def is_debugging(self) -> bool: return self._debug_state != "idle"
[docs] @property def debug_state(self) -> str: return self._debug_state
[docs] @property def current_file(self) -> str: return self._current_file
[docs] @property def current_line(self) -> int: return self._current_line
[docs] @property def threads(self) -> list[dict[str, Any]]: return self._threads
[docs] @property def stack_frames(self) -> list[dict[str, Any]]: return self._stack_frames
[docs] @property def scopes(self) -> list[dict[str, Any]]: return self._scopes
[docs] @property def variables(self) -> dict[int, list[dict[str, Any]]]: return self._variables
# -- IDE menu / shortcut action handlers -----------------------------------
[docs] def on_run_file(self): if not self._ide or not self._state.active_file: return if self._state.active_file.startswith("untitled"): self._state.status_message.emit("Save file first before running") return # If breakpoints exist, use debugger if self._state.all_breakpoints: self.start_debug(self._state.active_file) self._ide._show_bottom_panel() if self._bottom_tabs: tab_controls = [c for c in self._bottom_tabs.children if hasattr(c, "visible")] for i, c in enumerate(tab_controls): if getattr(c, "name", "") == "Debug": self._bottom_tabs.current_tab = i self._bottom_tabs._update_layout() break self._state.status_message.emit("Debugging started...") else: self._state.run_requested.emit(self._state.active_file)
[docs] def on_run_no_debug(self): if self._state.active_file: self._run_in_terminal(self._state.active_file)
[docs] def on_run_requested(self, path: str): self._run_in_terminal(path)
def _run_in_terminal(self, path: str): if not self._ide: return self._ide._show_bottom_panel() self._ide._switch_to_tab("Terminal") if self._terminal_panel: self._terminal_panel.run_file(path)
[docs] def on_toggle_breakpoint(self): if self._state.active_file: self._state.toggle_breakpoint(self._state.active_file, self._state.cursor_line)
[docs] def on_step_over(self): self.step_over()
[docs] def on_step_into(self): self.step_into()
[docs] def on_step_out(self): self.step_out()
[docs] def on_stop_debug(self): self.stop_debug() self._state.debug_stopped.emit()
[docs] def on_restart_debug(self): path = self._state.active_file self.stop_debug() if path: self.start_debug(path)
# -- Internal: launch sequence --------------------------------------------- def _after_initialize(self): """Called after initialize response -- set breakpoints, launch, configurationDone.""" # Sync all breakpoints (with conditions if any) all_bp = self._state.all_breakpoints for path, lines in all_bp.items(): if lines: self._set_breakpoints_with_conditions(path, sorted(lines)) # Launch target self._client.launch(self._launch_path) # Signal configuration complete self._client.configuration_done() # -- Internal: event handlers ---------------------------------------------- def _on_debug_state_changed(self, state_name: str, data: dict): self._debug_state = state_name if state_name == "stopped": thread_id = data.get("thread_id", 0) if thread_id: self._current_thread_id = thread_id self._fetch_threads() self._fetch_stack_trace() def _on_stopped_event(self, body: dict): reason = body.get("reason", "") if reason == "exception": pass # Exception navigation handled by DAPClient._navigate_to_exception def _on_terminated(self): self._debug_state = "idle" self._clear_cache() def _on_breakpoint_toggled(self, path: str, line: int): """Re-sync breakpoints for the file when toggled during a live session.""" if not self._client or not self._client.running: return lines = sorted(self._state.get_breakpoints(path)) self._set_breakpoints_with_conditions(path, lines) def _on_evaluate_response(self, msg: dict): body = msg.get("body", {}) result = body.get("result", "") if result: self._state.debug_output.emit(result + "\n", "console") # -- Internal: data fetching ----------------------------------------------- def _fetch_threads(self): if not self._client: return self._client.get_threads(self._on_threads_response) def _on_threads_response(self, msg: dict): body = msg.get("body", {}) self._threads = body.get("threads", []) if self._threads and not self._current_thread_id: self._current_thread_id = self._threads[0].get("id", 0) def _fetch_stack_trace(self): if not self._client or not self._current_thread_id: return self._client.get_stack_trace(self._current_thread_id, self._on_stack_trace_response) def _on_stack_trace_response(self, msg: dict): body = msg.get("body", {}) self._stack_frames = body.get("stackFrames", []) if self._stack_frames: top = self._stack_frames[0] self._current_frame_id = top.get("id", 0) source = top.get("source", {}) self._current_file = source.get("path", "") self._current_line = top.get("line", 0) if self._current_file and self._current_line: self._state.goto_requested.emit(self._current_file, self._current_line, 0) self._fetch_scopes() def _fetch_scopes(self): if not self._client or not self._current_frame_id: return self._client.get_scopes(self._current_frame_id, self._on_scopes_response) def _on_scopes_response(self, msg: dict): body = msg.get("body", {}) self._scopes = body.get("scopes", []) self._variables.clear() for scope in self._scopes: ref = scope.get("variablesReference", 0) if ref: self.fetch_variables(ref) # -- Cache ----------------------------------------------------------------- def _clear_cache(self): self._threads.clear() self._stack_frames.clear() self._scopes.clear() self._variables.clear() self._current_thread_id = 0 self._current_frame_id = 0 self._current_file = "" self._current_line = 0
[docs] def process(self, dt: float): """Tick the DAP client to poll subprocess I/O.""" if self._client: self._client.process(dt)