"""Play Mode — Run/pause/stop lifecycle for in-editor game preview.
Manages scene serialization/restoration, camera switching, input routing,
and per-frame process/physics updates during play mode. The game runs in
a separate SceneTree with isolated input state so editor and game input
never interfere.
Usage:
play_mode = PlayMode(editor_state)
play_mode.start() # F5 — serialize scene, begin processing
play_mode.toggle_pause() # F7 — pause/resume processing
play_mode.stop() # F6 — restore pre-play scene state
# Called each frame by the editor's main loop:
play_mode.update(dt)
"""
import logging
import math
import os
import subprocess
import time
import traceback as _tb
from pathlib import Path
from simvx.core import Camera3D, Input, InputMap, MouseButton, Node, OrbitCamera3D, SceneTree
from simvx.core._scene_internal import _deserialize_node, _serialize_node
from simvx.core.debug.profiler import FrameProfiler
from simvx.core.hot_reload import HotReloadManager
from .state import State
log = logging.getLogger("simvx.play")
_hot_reload_log = logging.getLogger("simvx.editor.play_mode")
# Directories to skip when discovering watchable .py files under the project root.
_HOT_RELOAD_SKIP_DIRS = frozenset({".venv", "venv", "__pycache__", "node_modules", "dist", "build", ".git"})
# Orbit camera input sensitivity
_ORBIT_SENSITIVITY = math.radians(0.35) # radians per pixel of mouse movement
_ZOOM_SENSITIVITY = 1.5 # distance change per scroll step
_PITCH_MIN = math.radians(-85.0) # upper half-sphere: can't go below floor
_PITCH_MAX = math.radians(-2.0) # prevent flipping to underside
# Border colours for the viewport overlay (RGBA, 0-1 range)
_COLOUR_PLAYING = (0.2, 0.8, 0.2, 1.0) # Green — game is running
_COLOUR_PAUSED = (1.0, 0.6, 0.0, 1.0) # Orange — game is paused
_COLOUR_STOPPED = None # No border when stopped
# ======================================================================
# Isolated input state for game tree
# ======================================================================
class _GameInputState:
"""Snapshot-able container for Input class state.
Holds all the mutable dicts/sets that Input uses as ClassVars.
Used to swap game input state in and out of the Input singleton
so game scripts see isolated input without code changes.
"""
__slots__ = (
"keys", "keys_just_pressed", "keys_just_released",
"mouse_pos", "mouse_delta", "scroll_delta",
"gamepad_buttons", "gamepad_axes",
"keys_pressed", "keys_just_pressed_typed", "keys_just_released_typed",
"mouse_buttons_pressed", "mouse_buttons_just_pressed", "mouse_buttons_just_released",
"joy_axes", "joy_buttons_pressed", "joy_buttons_just_pressed", "joy_buttons_just_released",
"touches", "touches_just_pressed", "touches_just_released",
"actions",
)
def __init__(self):
self.keys: dict[str, bool] = {}
self.keys_just_pressed: dict[str, bool] = {}
self.keys_just_released: dict[str, bool] = {}
self.mouse_pos: tuple[float, float] = (0.0, 0.0)
self.mouse_delta: tuple[float, float] = (0.0, 0.0)
self.scroll_delta: tuple[float, float] = (0.0, 0.0)
self.gamepad_buttons: dict[int, dict[str, bool]] = {}
self.gamepad_axes: dict[int, dict[str, float]] = {}
self.keys_pressed: set[int] = set()
self.keys_just_pressed_typed: set[int] = set()
self.keys_just_released_typed: set[int] = set()
self.mouse_buttons_pressed: set[int] = set()
self.mouse_buttons_just_pressed: set[int] = set()
self.mouse_buttons_just_released: set[int] = set()
self.joy_axes: dict[int, float] = {}
self.joy_buttons_pressed: set[int] = set()
self.joy_buttons_just_pressed: set[int] = set()
self.joy_buttons_just_released: set[int] = set()
self.touches: dict[int, tuple[float, float, float]] = {}
self.touches_just_pressed: dict[int, tuple[float, float, float]] = {}
self.touches_just_released: set[int] = set()
# Isolated action bindings: dict[str, list[InputBinding]]
self.actions: dict = {}
def new_frame(self) -> None:
"""Clear per-frame just-pressed/released sets (mirrors Input._new_frame)."""
self.keys_just_pressed_typed.clear()
self.keys_just_released_typed.clear()
self.mouse_buttons_just_pressed.clear()
self.mouse_buttons_just_released.clear()
self.joy_buttons_just_pressed.clear()
self.joy_buttons_just_released.clear()
def end_frame(self) -> None:
"""Clear all per-frame transient state (mirrors Input._end_frame)."""
self.keys_just_pressed.clear()
self.keys_just_released.clear()
self.keys_just_pressed_typed.clear()
self.keys_just_released_typed.clear()
self.mouse_buttons_just_pressed.clear()
self.mouse_buttons_just_released.clear()
self.joy_buttons_just_pressed.clear()
self.joy_buttons_just_released.clear()
self.mouse_delta = (0.0, 0.0)
self.scroll_delta = (0.0, 0.0)
self.touches_just_pressed.clear()
self.touches_just_released.clear()
# -- Event injection (same logic as Input._on_key etc.) --
def on_key(self, key: int, pressed: bool) -> None:
if pressed:
if key not in self.keys_pressed:
self.keys_just_pressed_typed.add(key)
self.keys_pressed.add(key)
else:
self.keys_pressed.discard(key)
self.keys_just_released_typed.add(key)
def on_mouse_button(self, button: int, pressed: bool) -> None:
if pressed:
if button not in self.mouse_buttons_pressed:
self.mouse_buttons_just_pressed.add(button)
self.mouse_buttons_pressed.add(button)
else:
self.mouse_buttons_pressed.discard(button)
self.mouse_buttons_just_released.add(button)
def on_mouse_move(self, x: float, y: float) -> None:
old = self.mouse_pos
self.mouse_pos = (x, y)
self.mouse_delta = (x - old[0], y - old[1])
def on_scroll(self, dx: float, dy: float) -> None:
self.scroll_delta = (dx, dy)
def _install_game_input(state: _GameInputState) -> None:
"""Swap the game's input state into the Input/InputMap singletons."""
Input._keys = state.keys
Input._keys_just_pressed = state.keys_just_pressed
Input._keys_just_released = state.keys_just_released
Input._mouse_pos = state.mouse_pos
Input._mouse_delta = state.mouse_delta
Input._scroll_delta = state.scroll_delta
Input._gamepad_buttons = state.gamepad_buttons
Input._gamepad_axes = state.gamepad_axes
Input._keys_pressed = state.keys_pressed
Input._keys_just_pressed_typed = state.keys_just_pressed_typed
Input._keys_just_released_typed = state.keys_just_released_typed
Input._mouse_buttons_pressed = state.mouse_buttons_pressed
Input._mouse_buttons_just_pressed = state.mouse_buttons_just_pressed
Input._mouse_buttons_just_released = state.mouse_buttons_just_released
Input._joy_axes = state.joy_axes
Input._joy_buttons_pressed = state.joy_buttons_pressed
Input._joy_buttons_just_pressed = state.joy_buttons_just_pressed
Input._joy_buttons_just_released = state.joy_buttons_just_released
Input._touches = state.touches
Input._touches_just_pressed = state.touches_just_pressed
Input._touches_just_released = state.touches_just_released
InputMap._actions = state.actions
def _snapshot_editor_input() -> _GameInputState:
"""Capture the current Input/InputMap state into a _GameInputState."""
s = _GameInputState.__new__(_GameInputState)
s.keys = Input._keys
s.keys_just_pressed = Input._keys_just_pressed
s.keys_just_released = Input._keys_just_released
s.mouse_pos = Input._mouse_pos
s.mouse_delta = Input._mouse_delta
s.scroll_delta = Input._scroll_delta
s.gamepad_buttons = Input._gamepad_buttons
s.gamepad_axes = Input._gamepad_axes
s.keys_pressed = Input._keys_pressed
s.keys_just_pressed_typed = Input._keys_just_pressed_typed
s.keys_just_released_typed = Input._keys_just_released_typed
s.mouse_buttons_pressed = Input._mouse_buttons_pressed
s.mouse_buttons_just_pressed = Input._mouse_buttons_just_pressed
s.mouse_buttons_just_released = Input._mouse_buttons_just_released
s.joy_axes = Input._joy_axes
s.joy_buttons_pressed = Input._joy_buttons_pressed
s.joy_buttons_just_pressed = Input._joy_buttons_just_pressed
s.joy_buttons_just_released = Input._joy_buttons_just_released
s.touches = Input._touches
s.touches_just_pressed = Input._touches_just_pressed
s.touches_just_released = Input._touches_just_released
s.actions = InputMap._actions
return s
# ======================================================================
# PlayMode
# ======================================================================
[docs]
class PlayMode:
"""Manages the run/pause/stop lifecycle for previewing games in the editor.
The game scene runs in an isolated SceneTree with its own input state.
The editor's process loop calls ``update(dt)`` every frame to drive
the game cooperatively (no threading).
"""
def __init__(self, state: State):
self._state = state
# Serialized snapshot of the scene before play started
self._saved_scene_data: dict | None = None
# Separate SceneTree for the running game
self._game_tree: SceneTree | None = None
# Isolated input state for the game
self._game_input: _GameInputState | None = None
# The game's own camera (if the scene contains one)
self._game_camera: Camera3D | None = None
# Runtime metrics
self._elapsed_time: float = 0.0
self._frame_count: int = 0
# Whether ready() has been called on the game tree
self._ready_called: bool = False
# Orbit camera drag state
self._orbit_dragging: bool = False
self._last_mouse: tuple[float, float] = (0.0, 0.0)
# Detached game window subprocess
self._detached_proc: subprocess.Popen | None = None
# Scene transition tracking
self._last_root: Node | None = None
# GPU game viewport renderer (created when an App with graphics is available)
self._game_viewport = None # GameViewportRenderer | None
self._game_viewport_enabled: bool = False
# Buffered UI events for game tree dispatch (drained during update)
self._ui_event_queue: list[dict] = []
# Optional profiler -- when set, update() times phases and samples
# per-node costs into it. Assigned by Root at init time.
self._profiler: FrameProfiler | None = None
# Hot-reload manager (created on start, torn down on stop). Watches
# project script files and class-swaps live nodes when sources change.
self._hot_reload_mgr: HotReloadManager | None = None
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def start(self, on_tree_created=None) -> None:
"""Begin play mode (triggered by F5).
Serializes the current scene so it can be restored on stop,
creates an isolated game tree with its own input state, and
calls ready() on all game nodes.
Args:
on_tree_created: Optional callback ``fn(game_root)`` invoked after
the game tree is constructed but before ``ready()`` is called.
Used by the editor to load scripts onto the cloned tree.
"""
if self._state.is_playing:
return
root = self._get_root()
if root is None:
return
# Deep-serialize the scene for later restoration
self._saved_scene_data = _serialize_node(root)
# Clone the scene into a separate game tree
game_root = _deserialize_node(self._saved_scene_data)
if game_root is None:
return
self._game_tree = SceneTree(isolated_input=True)
self._game_tree.root = game_root
game_root._enter_tree(self._game_tree)
# Create isolated input state for the game
self._game_input = _GameInputState()
# Locate the game's Camera3D (if any) for rendering
self._game_camera = self._find_game_camera(game_root)
# Reset runtime metrics
self._elapsed_time = 0.0
self._frame_count = 0
self._ready_called = False
# User game code gets recovery behaviour, not crashes
Node.strict_errors = False
# Flip the state flags
self._state.is_playing = True
self._state.is_paused = False
self._state.play_state_changed.emit()
# Hook for script loading (before ready)
if on_tree_created is not None:
on_tree_created(game_root)
# Initialize the game tree -- call ready() on all nodes
self._call_ready(game_root)
self._ready_called = True
self._last_root = game_root
# Wire hot reload: watch project scripts for live class-swap. The
# toolbar toggle stores its preference on State; honour it now
# so the user's last choice persists across PlayMode restarts.
self._hot_reload_mgr = HotReloadManager(self._game_tree)
self._hot_reload_mgr.enabled = self._state.hot_reload_enabled
for path in self._discover_watchable_scripts():
try:
self._hot_reload_mgr.watch(path)
except Exception:
# Justified: a single un-importable script must not abort play start.
_hot_reload_log.exception("hot_reload: failed to watch %s", path)
[docs]
def set_hot_reload_enabled(self, enabled: bool) -> None:
"""Enable or disable script hot-reload during play mode.
Updates :attr:`State.hot_reload_enabled` AND, when a play
session is active, the live :class:`HotReloadManager` so the change
takes effect immediately without restarting PlayMode.
"""
self._state.hot_reload_enabled = bool(enabled)
if self._hot_reload_mgr is not None:
self._hot_reload_mgr.enabled = bool(enabled)
[docs]
def toggle_pause(self) -> None:
"""Toggle pause during play mode (triggered by F7).
When paused, process/physics updates are skipped but the scene
continues to render so the user can inspect the frozen state.
"""
if not self._state.is_playing:
return
self._state.is_paused = not self._state.is_paused
self._state.play_state_changed.emit()
[docs]
def stop(self) -> None:
"""Stop play mode and restore the pre-play scene (triggered by F6).
Destroys the game tree and input state, deserializes the saved
snapshot back into the editor's scene tree.
"""
if not self._state.is_playing:
return
# Clear play flags first
self._state.is_playing = False
self._state.is_paused = False
# Restore strict errors for editor's own code
Node.strict_errors = True
# Tear down the hot-reload manager (unwatch all files).
if self._hot_reload_mgr is not None:
for path in list(self._hot_reload_mgr.watched_files):
self._hot_reload_mgr.unwatch(path)
self._hot_reload_mgr = None
# Tear down the game tree
self._game_tree = None
self._game_input = None
self._ui_event_queue.clear()
# Restore scene from the saved snapshot
if self._saved_scene_data is not None:
restored_root = _deserialize_node(self._saved_scene_data)
if restored_root is not None:
self._state.edited_scene.set_root(restored_root)
self._saved_scene_data = None
# Stop any detached process
if self._detached_proc is not None:
self._detached_proc.terminate()
self._detached_proc = None
# Destroy game viewport renderer
self.destroy_game_viewport()
# Discard runtime references
self._game_camera = None
self._elapsed_time = 0.0
self._frame_count = 0
self._ready_called = False
self._last_root = None
# Notify listeners
self._state.play_state_changed.emit()
self._state.scene_changed.emit()
[docs]
def start_detached(self) -> None:
"""Start the game in a separate OS window via subprocess."""
if self._state.is_playing:
return
project_dir = self._state.project_path
if project_dir is None:
log.warning("Cannot start detached: no project path set")
return
try:
self._detached_proc = subprocess.Popen(["simvx", "run"], cwd=str(project_dir))
except FileNotFoundError:
log.error("'simvx' command not found — is simvx-core installed?")
return
self._state.is_playing = True
self._state.play_state_changed.emit()
[docs]
def stop_detached(self) -> None:
"""Stop the detached game window if running."""
if self._detached_proc is not None:
self._detached_proc.terminate()
self._detached_proc = None
if self._state.is_playing and self._game_tree is None:
self._state.is_playing = False
self._state.play_state_changed.emit()
[docs]
@property
def is_detached(self) -> bool:
"""Whether a detached game process is running."""
return self._detached_proc is not None
def _check_scene_transition(self) -> None:
"""Detect if the game tree's root changed (scene transition)."""
if self._game_tree is None:
return
current_root = self._game_tree.root
if current_root is not self._last_root:
self._last_root = current_root
self._game_camera = self._find_game_camera(current_root) if current_root else None
self._state.scene_changed.emit()
[docs]
def attach_profiler(self, profiler: FrameProfiler | None) -> None:
"""Attach (or detach) a :class:`FrameProfiler` for play-mode metrics.
When attached, :meth:`update` times each phase (``physics``,
``process``, ``ui``, ``total``) and samples per-node timings into
the profiler. The profiler also drives the editor's profiler panel.
"""
self._profiler = profiler
[docs]
@property
def profiler(self) -> FrameProfiler | None:
return self._profiler
[docs]
def update(self, dt: float) -> None:
"""Per-frame update, called by the editor's process loop.
When playing and not paused, swaps game input state into the
Input singleton, processes the game tree, then restores editor
input. This lets game scripts use ``Input.is_action_pressed()``
etc. transparently.
"""
if not self._state.is_playing:
self._orbit_dragging = False
self._game_camera = None
return
# Check for scene transitions
self._check_scene_transition()
# Always tick metrics (even when paused, for display purposes)
self._elapsed_time += dt
self._frame_count += 1
# Lazily locate the game camera
if self._game_camera is None and self._game_tree is not None:
root = self._game_tree.root
if root is not None:
self._game_camera = self._find_game_camera(root)
# Poll hot-reload manager so script edits apply live (also during pause).
# Justified: reload errors during play must NOT crash the editor; scripts are
# frequently mid-edit and import/syntax failures are a normal recoverable state.
if self._hot_reload_mgr is not None:
try:
self._hot_reload_mgr.poll(dt)
except Exception:
_hot_reload_log.exception("hot_reload: poll failed")
if self._state.is_paused:
return
if self._game_tree is None or self._game_tree.root is None:
return
root = self._game_tree.root
gi = self._game_input
prof = self._profiler
# Select timing-aware or plain tree walkers based on profiler presence.
if prof is not None and prof.enabled:
prof.begin("total")
process_tree = self._process_tree_profiled
physics_tree = self._physics_process_tree_profiled
else:
process_tree = PlayMode._process_tree # type: ignore[assignment]
physics_tree = PlayMode._physics_process_tree # type: ignore[assignment]
# Swap in game input, dispatch UI events, process, swap back
if gi is not None:
editor_snap = _snapshot_editor_input()
gi.new_frame()
_install_game_input(gi)
try:
# Dispatch queued UI events to the game tree's UIInputManager
if self._ui_event_queue:
if prof is not None:
prof.begin("ui")
for evt in self._ui_event_queue:
self._game_tree.ui_input(**evt)
self._ui_event_queue.clear()
if prof is not None:
prof.end("ui")
if prof is not None:
prof.begin("physics")
physics_tree(root, dt)
if prof is not None:
prof.end("physics")
prof.begin("process")
process_tree(root, dt)
if prof is not None:
prof.end("process")
finally:
# Capture updated mouse_pos etc. back into game state
gi.mouse_pos = Input._mouse_pos
gi.mouse_delta = Input._mouse_delta
gi.scroll_delta = Input._scroll_delta
gi.end_frame()
_install_game_input(editor_snap)
# Flush any nodes queued for deletion in the game tree
self._game_tree._flush_deletes()
else:
# Fallback: process without isolated input
if prof is not None:
prof.begin("physics")
physics_tree(root, dt)
if prof is not None:
prof.end("physics")
prof.begin("process")
process_tree(root, dt)
if prof is not None:
prof.end("process")
self._game_tree._flush_deletes()
if prof is not None and prof.enabled:
prof.count_nodes(self._game_tree)
prof.end("total")
prof.end_frame()
# ------------------------------------------------------------------
# Input forwarding
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Game tree access
# ------------------------------------------------------------------
[docs]
@property
def game_tree(self) -> SceneTree | None:
"""The game's isolated SceneTree, or None when not playing."""
return self._game_tree
# ------------------------------------------------------------------
# Game viewport (GPU render-to-texture)
# ------------------------------------------------------------------
[docs]
@property
def game_texture_id(self) -> int | None:
"""Bindless texture ID of the game viewport, or None if not available.
Returns a valid texture ID when the game viewport renderer is active
and ready. The viewport panel checks this to decide whether to display
a GPU-rendered game view or fall back to wireframe.
"""
gvp = self._game_viewport
if gvp is not None and gvp.ready:
return gvp.texture_id
return None
[docs]
@property
def game_viewport(self):
"""The GameViewportRenderer instance, or None."""
return self._game_viewport
[docs]
def create_game_viewport(self, engine, width: int, height: int) -> None:
"""Create the offscreen game viewport renderer.
Called by Root when play mode starts and a graphics engine
is available.
"""
try:
from simvx.graphics.renderer.game_viewport import GameViewportRenderer
self._game_viewport = GameViewportRenderer(engine)
self._game_viewport.create(width, height)
self._game_viewport_enabled = True
log.info("Game viewport renderer created (%dx%d)", width, height)
except Exception:
log.warning("Could not create game viewport renderer (no GPU?)", exc_info=True)
self._game_viewport = None
self._game_viewport_enabled = False
[docs]
def resize_game_viewport(self, width: int, height: int) -> None:
"""Resize the offscreen game viewport."""
if self._game_viewport is not None:
self._game_viewport.resize(width, height)
[docs]
def destroy_game_viewport(self) -> None:
"""Destroy the offscreen game viewport renderer."""
if self._game_viewport is not None:
self._game_viewport.destroy()
self._game_viewport = None
self._game_viewport_enabled = False
# ------------------------------------------------------------------
# Camera management
# ------------------------------------------------------------------
[docs]
def get_active_camera(self) -> Camera3D | None:
"""Return the camera that should drive the viewport.
During play mode the game's own Camera3D is used. Outside of
play mode the editor's orbit camera is returned.
"""
if self._state.is_playing and self._game_camera is not None:
return self._game_camera
return self._state.editor_camera
def _find_game_camera(self, root: Node) -> Camera3D | None:
"""Traverse the scene tree and return the first Camera3D node.
Skips the editor's own camera instance (stored in State).
"""
editor_cam = self._state.editor_camera
for node in root.find_all(Camera3D, recursive=True):
if node is not editor_cam:
return node
return None
def _handle_orbit_input(self) -> None:
"""Handle mouse drag to orbit the game camera during play mode.
Works with OrbitCamera3D nodes: left-drag orbits, scroll zooms.
Pitch is clamped to the upper half-sphere so the camera can't go below the floor.
"""
cam = self._game_camera
if cam is None or not isinstance(cam, OrbitCamera3D):
return
mx, my = Input.mouse_position
lmb = Input.is_mouse_button_pressed(MouseButton.LEFT)
if lmb:
if self._orbit_dragging:
dx = mx - self._last_mouse[0]
dy = my - self._last_mouse[1]
if dx != 0 or dy != 0:
cam.yaw -= dx * _ORBIT_SENSITIVITY
cam.pitch = max(_PITCH_MIN, min(_PITCH_MAX, cam.pitch - dy * _ORBIT_SENSITIVITY))
cam._update_transform()
self._orbit_dragging = True
else:
self._orbit_dragging = False
self._last_mouse = (mx, my)
# Scroll to zoom
_sx, sy = Input.scroll_delta
if sy != 0:
cam.zoom(sy * _ZOOM_SENSITIVITY)
# ------------------------------------------------------------------
# Visual indicator
# ------------------------------------------------------------------
[docs]
def get_border_colour(self) -> tuple[float, float, float, float] | None:
"""Return a viewport border colour indicating the current play state.
* Green ``(0.2, 0.8, 0.2, 1.0)`` -- game is running.
* Orange ``(1.0, 0.6, 0.0, 1.0)`` -- game is paused.
* ``None`` -- editor is in normal (stopped) mode.
"""
if not self._state.is_playing:
return _COLOUR_STOPPED
if self._state.is_paused:
return _COLOUR_PAUSED
return _COLOUR_PLAYING
# ------------------------------------------------------------------
# Runtime metrics (read-only)
# ------------------------------------------------------------------
[docs]
@property
def elapsed_time(self) -> float:
"""Seconds elapsed since play mode started."""
return self._elapsed_time
[docs]
@property
def frame_count(self) -> int:
"""Number of frames processed since play mode started."""
return self._frame_count
[docs]
@property
def is_active(self) -> bool:
"""Convenience: True when the game is playing (paused or not)."""
return self._state.is_playing
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _get_root(self) -> Node | None:
"""Return the editor scene root, or None."""
tree = self._state.edited_scene
if tree is None:
return None
return tree.root
def _discover_watchable_scripts(self) -> list[str]:
"""Return absolute paths of project ``.py`` files for hot-reload to watch.
Walks ``state.project_path`` (when set) and collects every ``.py`` file
outside the standard skip dirs (``.venv``, ``__pycache__``, etc.).
Returns an empty list when no project path is configured.
"""
project_path = self._state.project_path
if project_path is None:
return []
root = Path(project_path)
if not root.is_dir():
return []
scripts: list[str] = []
for dirpath, dirnames, filenames in os.walk(root):
# Prune skip dirs in-place so os.walk doesn't descend into them.
dirnames[:] = [d for d in dirnames if d not in _HOT_RELOAD_SKIP_DIRS]
for fname in filenames:
if fname.endswith(".py"):
scripts.append(str(Path(dirpath) / fname))
return scripts
@staticmethod
def _call_ready(node: Node) -> None:
"""Recursively call ``ready()`` on a node and all its descendants.
Children are readied before their parent, matching the SceneTree
convention (depth-first, bottom-up). Errors are logged, not raised.
"""
for child in list(node.children):
PlayMode._call_ready(child)
try:
node.ready()
except Exception:
tb = _tb.format_exc()
log.exception("Script error in %s.ready -- node disabled", node.name)
node._script_error = True
try:
Node.script_error_raised.emit(node, "ready", tb)
except Exception:
# justified: signal-handler error during ready; primary script error already logged
pass
@staticmethod
def _process_tree(node: Node, dt: float) -> None:
"""Recursively call ``process(dt)`` and tick coroutines."""
if getattr(node, "_script_error", False):
return
try:
node.process(dt)
except AssertionError:
raise
except Exception:
node._script_error = True
tb = _tb.format_exc()
log.exception("Script error in %s.process -- node disabled", node.name)
try:
Node.script_error_raised.emit(node, "process", tb)
except Exception:
# justified: signal-handler error during process; primary script error already logged
pass
return
node._tick_coroutines(dt)
for child in list(node.children):
PlayMode._process_tree(child, dt)
@staticmethod
def _physics_process_tree(node: Node, dt: float) -> None:
"""Recursively call ``physics_process(dt)``."""
if getattr(node, "_script_error", False):
return
try:
node.physics_process(dt)
except AssertionError:
raise
except Exception:
node._script_error = True
tb = _tb.format_exc()
log.exception("Script error in %s.physics_process -- node disabled", node.name)
try:
Node.script_error_raised.emit(node, "physics_process", tb)
except Exception:
# justified: signal-handler error during physics_process; primary script error already logged
pass
return
for child in list(node.children):
PlayMode._physics_process_tree(child, dt)
# ------------------------------------------------------------------
# Profiling variants -- per-node timed tree walks.
# ------------------------------------------------------------------
def _process_tree_profiled(self, node: Node, dt: float) -> None:
"""Timed :meth:`_process_tree` that feeds per-node samples to the profiler."""
if getattr(node, "_script_error", False):
return
prof = self._profiler
t0 = time.perf_counter()
try:
node.process(dt)
except AssertionError:
raise
except Exception:
node._script_error = True
tb = _tb.format_exc()
log.exception("Script error in %s.process -- node disabled", node.name)
try:
Node.script_error_raised.emit(node, "process", tb)
except Exception:
# justified: signal-handler error during profiled process; primary script error already logged
pass
return
finally:
if prof is not None:
prof.sample_node(node.path, "process", (time.perf_counter() - t0) * 1000.0)
node._tick_coroutines(dt)
for child in list(node.children):
self._process_tree_profiled(child, dt)
def _physics_process_tree_profiled(self, node: Node, dt: float) -> None:
"""Timed :meth:`_physics_process_tree` with per-node samples."""
if getattr(node, "_script_error", False):
return
prof = self._profiler
t0 = time.perf_counter()
try:
node.physics_process(dt)
except AssertionError:
raise
except Exception:
node._script_error = True
tb = _tb.format_exc()
log.exception("Script error in %s.physics_process -- node disabled", node.name)
try:
Node.script_error_raised.emit(node, "physics_process", tb)
except Exception:
# justified: signal-handler error during profiled physics_process; primary script error already logged
pass
return
finally:
if prof is not None:
prof.sample_node(node.path, "physics", (time.perf_counter() - t0) * 1000.0)
for child in list(node.children):
self._physics_process_tree_profiled(child, dt)