Source code for simvx.core.scene_tree

"""SceneTree — Central manager for the node tree, groups, input routing, and UI focus."""

import logging
from contextlib import contextmanager
from typing import Any

import numpy as np

from .descriptors import Signal
from .event_bus import EventBus
from .events import InputEvent
from .math.raycast import ray_intersect_sphere, screen_to_ray
from .input.enums import MouseButton
from .node import Node
from .nodes_2d.camera import Camera2D
from .nodes_3d.camera import Camera3D
from .nodes_3d.mesh import MeshInstance3D
from .physics_nodes import CollisionShape3D

# The "events" autoload name is reserved for the engine-provided EventBus.
# Project TOMLs and runtime code may not declare or replace it.
RESERVED_AUTOLOAD_EVENTS = "events"

log = logging.getLogger(__name__)

[docs] class SceneTree: """Central manager for the node tree, groups, input routing, and UI focus. Owns the root node and drives per-frame ``process`` / ``physics_process`` / ``draw`` traversals. Also manages pause state, scene changes, and the UI input pipeline (mouse, keyboard, popups). """
[docs] @property def app(self): """The App instance running this tree (set by graphics backend).""" return getattr(self, "_app", None)
[docs] @property def events(self) -> EventBus: """Engine-provided typed event bus. Use ``tree.events.connect(EventCls, handler)`` to register a typed handler and ``tree.events.emit(event)`` (or ``emit_deferred``) to publish. The bus survives ``change_scene()`` -- connections held by autoloads or other long-lived objects keep firing across scene swaps. Deferred events queued during a frame are dispatched at the start of the next ``process()`` tick, before any node ``_process`` runs. """ return self._events
[docs] @property def input(self): """The Input instance for this tree (per-tree isolation).""" return self._own_input
[docs] @property def input_map(self): """The InputMap instance for this tree (per-tree isolation).""" return self._own_input_map
[docs] @contextmanager def activate_input(self): """Context manager to make this tree's Input/InputMap the active ones. Use this when processing the tree so that game code calling ``Input.is_action_pressed(...)`` sees this tree's input state. """ from .input.map import set_active_input_map from .input.state import set_active_input with set_active_input(self._own_input), set_active_input_map(self._own_input_map): yield
def __init__(self, screen_size=None, *, isolated_input: bool = False): if isolated_input: from .input.map import _InputMap from .input.state import _Input self._own_input_map = _InputMap() self._own_input = _Input(input_map=self._own_input_map) else: from .input.map import _default_input_map from .input.state import _default_input self._own_input_map = _default_input_map self._own_input = _default_input self.root: Node | None = None self._screen_size: tuple[float, float] = SceneTree._normalize_size(screen_size or (800, 600)) self.paused: bool = False self._delete_queue: list[Node] = [] self._groups: dict[str, set[Node]] = {} self._autoloads: dict[str, Node] = {} # Engine-provided typed event bus, accessible as ``tree.events``. Held # outside ``_autoloads`` because EventBus is not a Node and does not # participate in the process/physics traversal -- the per-frame tick # explicitly calls ``self._events.flush_deferred()``. The name # ``events`` is reserved (see add_autoload() and project.py). self._events: EventBus = EventBus() self._unique_nodes: dict[str, Node] = {} from .ui.ui_input import UIInputManager # local import avoids cycle via ui/testing.py self._ui = UIInputManager() self._current_camera_2d: Camera2D | None = None self.auto_physics: bool = True # Automatically step PhysicsServer each physics tick self.overlay_offset: tuple[float, float] = (0.0, 0.0) # 2D offset for Text2D/particle overlays self.play_viewport_rect: tuple[float, float, float, float] | None = None # (x, y, w, h) to constrain 3D rendering self._structure_version: int = 0 # Incremented on add_child/remove_child for cache invalidation self._running: bool = True self.quit_requested: Signal = Signal() # -- UI state forwarding (preserves external interface) -- @property def _focused_control(self): return self._ui._focused_control @_focused_control.setter def _focused_control(self, v): self._ui._focused_control = v @property def _mouse_grab(self) -> Any: return self._ui._mouse_grab @_mouse_grab.setter def _mouse_grab(self, v): self._ui._mouse_grab = v @property def _last_mouse_pos(self): return self._ui._last_mouse_pos @_last_mouse_pos.setter def _last_mouse_pos(self, v): self._ui._last_mouse_pos = v @property def _popup_stack(self) -> list: return self._ui._popup_stack @_popup_stack.setter def _popup_stack(self, v): self._ui._popup_stack = v @property def _shortcut_handler(self): return self._ui._shortcut_handler @_shortcut_handler.setter def _shortcut_handler(self, v): self._ui._shortcut_handler = v @staticmethod def _normalize_size(sz) -> tuple[float, float]: """Coerce any screen_size representation to a plain tuple once.""" if isinstance(sz, tuple) and len(sz) == 2: return sz if hasattr(sz, 'x'): return (float(sz.x), float(sz.y)) return (float(sz[0]), float(sz[1]))
[docs] @property def is_running(self) -> bool: """Whether the tree is actively being ticked by its driving app. Set to False by :meth:`quit` (or the graphics backend's app ``quit()``), signalling the main loop to exit at the end of the current frame. """ return self._running
[docs] def quit(self) -> None: """Request a clean shutdown of the running tree. Emits :attr:`quit_requested` and flips :attr:`is_running` to False. The driving app polls this state and exits its loop at the end of the current frame. Safe to call from node callbacks or signal handlers. """ if not self._running: return self._running = False self.quit_requested()
@property def screen_size(self) -> tuple[float, float]: return self._screen_size
[docs] @screen_size.setter def screen_size(self, value): old = self._screen_size self._screen_size = SceneTree._normalize_size(value) if self._screen_size != old: self._invalidate_draw_caches()
[docs] def set_root(self, root: Node): """Set the root node of the scene tree.""" log.debug("SceneTree.set_root(%s)", root) self.root = root root._enter_tree(self) root._ready_recursive()
[docs] def change_scene(self, new_root: Node): """Swap the active root with ``new_root``. The old root receives ``_exit_tree``; ``new_root`` then runs the full ``_enter_tree`` / ``_ready_recursive`` path, identical to the initial root. Autoloads are left in place and their groups and unique-name entries are re-registered on the rebuilt tree. Pending deletes, UI popup state, and the active 2D camera are cleared. Use this for title → gameplay → game-over navigation. See :doc:`../patterns` for a full example. """ log.debug("SceneTree.change_scene(%s), old root=%s", new_root, self.root) if self.root: self.root._exit_tree() # Rebuild groups from autoloads only (scene groups were cleared by _exit_tree) self._groups.clear() for node in self._autoloads.values(): self._reregister_groups(node) self._unique_nodes.clear() for node in self._autoloads.values(): self._reregister_unique(node) self._delete_queue.clear() self._ui.reset() self._current_camera_2d = None self.set_root(new_root)
[docs] def process(self, dt: float): """Run process callbacks and coroutines on all nodes for one frame. Order: autoloads' ``_process`` first (Godot-style global singletons), then ``self.events.flush_deferred()`` so deferred events queued during the previous frame (process, physics, or input) reach all subscribers before scene logic runs, then the scene root's ``_process``. Anything ``emit_deferred``'d during this frame's autoload pass is also drained in the same flush, keeping the scene's view of the world consistent. """ with self.activate_input(): from .ui.core import Control Control._current_frame += 1 for node in self._autoloads.values(): node._process_recursive(dt, self.paused) self._events.flush_deferred() if self.root: self.root._process_recursive(dt, self.paused) self._flush_deletes()
[docs] def physics_process(self, dt: float): """Run physics_process callbacks on all nodes, then auto-step physics.""" with self.activate_input(): for node in self._autoloads.values(): node._physics_process_recursive(dt, self.paused) if self.root: self.root._physics_process_recursive(dt, self.paused) if self.auto_physics: from .physics.engine import PhysicsServer server = PhysicsServer.get() if server and server.body_count > 0: server.step(dt)
[docs] def propagate_input(self, event) -> None: """Propagate an input event through the node tree. Walks the tree front-to-back (children before parents, reversed child order). Each node's ``input()`` is called until ``event.handled`` is set. ``unhandled_input()`` is called on all remaining nodes regardless. """ if self.root: self.root._propagate_input(event)
[docs] def draw(self, renderer): cam = self._current_camera_2d _has = hasattr(renderer, 'push_transform') if _has and cam is not None: z = cam.zoom if cam.zoom > 0 else 1.0 sw, sh = self._screen_size sx, sy = cam._shake_offset renderer.push_transform( z, 0.0, 0.0, z, (-float(cam._current[0]) + float(sx)) * z + sw * 0.5, (-float(cam._current[1]) + float(sy)) * z + sh * 0.5, ) if self.root: self.root._draw_recursive(renderer) if _has and cam is not None: renderer.pop_transform() # Draw popups last (on top of everything, in screen space) if self._ui._popup_stack: # Force a new rendering layer so popup fills draw over prior lines if hasattr(renderer, 'new_layer'): renderer.new_layer() # Reset clip to full screen so popups aren't clipped by parent containers if hasattr(renderer, 'reset_clip'): renderer.reset_clip() for popup in self._ui._popup_stack: popup.draw_popup(renderer)
[docs] def push_popup(self, control): """Register a control as an active popup (drawn on top, receives input first).""" self._ui.push_popup(control)
[docs] def pop_popup(self, control): """Unregister a popup control.""" self._ui.pop_popup(control)
[docs] def input_cast(self, screen_pos: tuple[float, float] | np.ndarray, button: MouseButton = MouseButton.LEFT): """Cast a ray from screen_pos through the camera into the scene. Finds the nearest pickable CollisionShape3D and delivers an InputEvent to its parent node.""" if not self.root: return cameras = self.root.find_all(Camera3D) if not cameras: return camera = cameras[0] # screen_size is normalized to tuple — no isinstance per call sw, sh = self.screen_size aspect = sw / sh if sh > 0 else 1.0 view = camera.view_matrix proj = camera.projection_matrix(aspect) origin, direction = screen_to_ray(screen_pos, self.screen_size, view, proj) # Find nearest pickable collision shape best_t = float('inf') best_node = None for shape in self.root.find_all(CollisionShape3D): if not shape.pickable: continue t = ray_intersect_sphere(origin, direction, shape.world_position, shape.radius) if t is not None and t < best_t: best_t = t best_node = shape.parent if shape.parent else shape if best_node is not None: event = InputEvent(screen_pos, button, origin, direction, best_t) best_node.picked(event)
[docs] def get_group(self, name: str) -> list[Node]: """Get all nodes in a group.""" return list(self._groups.get(name, ()))
# --- Autoloads (singletons that persist across scene changes) ---
[docs] @property def autoloads(self) -> dict[str, Node]: """Read-only view of registered autoloads.""" return self._autoloads
[docs] def add_autoload(self, name: str, node: Node): """Register ``node`` as a persistent singleton attached to the tree. The node enters the tree and runs ``ready()`` immediately. Unlike a regular child, it is not reachable via the scene root — retrieve it via ``tree.autoloads[name]``. Autoloads survive ``change_scene()``, making them the canonical home for global state (score, settings, audio manager). See :doc:`../patterns`. The name ``"events"`` is reserved for the engine-provided :class:`~simvx.core.event_bus.EventBus`; use ``tree.events`` instead. """ if name == RESERVED_AUTOLOAD_EVENTS: raise ValueError( "Autoload name 'events' is reserved for the engine-provided " "EventBus. Access it via tree.events; pick a different name " "for your autoload." ) self._autoloads[name] = node node._enter_tree(self) node._ready_recursive()
[docs] def remove_autoload(self, name: str): """Unregister and tear down an autoload. Calls ``_exit_tree`` on the node.""" node = self._autoloads.pop(name, None) if node: node._exit_tree()
# --- Unique nodes ---
[docs] def get_unique(self, name: str) -> Node | None: """Get a unique node by name. Returns None if not found.""" return self._unique_nodes.get(name)
# --- Internal helpers --- def _group_add(self, group: str, node: Node): if group not in self._groups: self._groups[group] = set() self._groups[group].add(node) def _group_remove(self, group: str, node: Node): if group in self._groups: self._groups[group].discard(node) def _reregister_groups(self, node: Node): """Re-add a node (and descendants) to the group index.""" for group in node._groups: self._group_add(group, node) for child in node.children: self._reregister_groups(child) def _reregister_unique(self, node: Node): """Re-add a node (and descendants) to the unique-node index.""" if node.unique_name: self._unique_nodes[node.name] = node for child in node.children: self._reregister_unique(child) def _invalidate_draw_caches(self): """Invalidate draw caches of Controls whose rect depends on screen/parent size. Only anchored controls (any of anchor_{left,top,right,bottom} != 0) have their absolute rect affected by a screen-size change. Controls with the default zero anchors have rects computed purely from their own position/size, so their caches stay valid here — own-size changes are handled by Property setters and own-position changes by _invalidate_transform. Uses an iterative stack to avoid Python recursion overhead on deep trees. """ if not self.root: return from .ui import Control stack = [self.root] while stack: node = stack.pop() if isinstance(node, Control) and node._draw_cache is not None: if (node.anchor_left or node.anchor_top or node.anchor_right or node.anchor_bottom): node._draw_dirty = True node._draw_cache = None children = node.children if children: stack.extend(children) def _queue_delete(self, node: Node): self._delete_queue.append(node) def _flush_deletes(self): for node in self._delete_queue: if node.parent: node.parent.remove_child(node) self._delete_queue.clear() def _collect_render_queue(self): """Collect all renderable nodes into batches (backend use only). Returns: List of RenderBatch objects, sorted by material/blend mode. Backends call this once per frame to get optimized draw list. """ from .render_queue import RenderQueue queue = RenderQueue() if self.root: self._collect_meshes_recursive(self.root, queue) return queue.sorted_batches def _collect_meshes_recursive(self, node: Node, queue): """Traverse tree collecting MeshInstance3D nodes into batches.""" if isinstance(node, MeshInstance3D) and node.mesh is not None: queue.add_instance(node.mesh, node.material, node.model_matrix, node.layer if hasattr(node, 'layer') else 0) for child in node.children: self._collect_meshes_recursive(child, queue) # ======================================================================== # UI Input System (delegated to UIInputManager) # ========================================================================
[docs] def ui_input(self, mouse_pos: tuple[float, float] | np.ndarray = None, button: int = 0, pressed: bool = True, key: str = "", char: str = ""): """Route UI input events to controls.""" self._ui.ui_input(self.root, mouse_pos=mouse_pos, button=button, pressed=pressed, key=key, char=char)
[docs] def touch_input(self, finger_id: int, action: int, x: float, y: float): """Route multi-touch events to controls with touch_mode='multi'.""" self._ui.touch_input(self.root, finger_id, action, x, y)
def _set_focused_control(self, control): """Set the focused control (forwarded to UIInputManager).""" self._ui._set_focused_control(control) def _update_mouse_over_states(self, mouse_pos): """Update mouse_over state for all controls (forwarded to UIInputManager).""" self._ui._update_mouse_over_states(self.root, mouse_pos) def _find_control_at_point(self, point): """Find topmost control at screen position (forwarded to UIInputManager).""" return self._ui._find_control_at_point(self.root, point)