"""SceneTree — Central manager for the node tree, groups, input routing, and UI focus."""
import logging
from contextlib import contextmanager
from typing import Any
import numpy as np
from .descriptors import Signal
from .event_bus import EventBus
from .events import InputEvent
from .math.raycast import ray_intersect_sphere, screen_to_ray
from .input.enums import MouseButton
from .node import Node
from .nodes_2d.camera import Camera2D
from .nodes_3d.camera import Camera3D
from .nodes_3d.mesh import MeshInstance3D
from .physics_nodes import CollisionShape3D
# The "events" autoload name is reserved for the engine-provided EventBus.
# Project TOMLs and runtime code may not declare or replace it.
RESERVED_AUTOLOAD_EVENTS = "events"
log = logging.getLogger(__name__)
[docs]
class SceneTree:
"""Central manager for the node tree, groups, input routing, and UI focus.
Owns the root node and drives per-frame ``process`` / ``physics_process`` /
``draw`` traversals. Also manages pause state, scene changes, and the UI
input pipeline (mouse, keyboard, popups).
"""
[docs]
@property
def app(self):
"""The App instance running this tree (set by graphics backend)."""
return getattr(self, "_app", None)
[docs]
@property
def events(self) -> EventBus:
"""Engine-provided typed event bus.
Use ``tree.events.connect(EventCls, handler)`` to register a typed
handler and ``tree.events.emit(event)`` (or ``emit_deferred``) to
publish. The bus survives ``change_scene()`` -- connections held by
autoloads or other long-lived objects keep firing across scene swaps.
Deferred events queued during a frame are dispatched at the start of
the next ``process()`` tick, before any node ``_process`` runs.
"""
return self._events
def __init__(self, screen_size=None, *, isolated_input: bool = False):
if isolated_input:
from .input.map import _InputMap
from .input.state import _Input
self._own_input_map = _InputMap()
self._own_input = _Input(input_map=self._own_input_map)
else:
from .input.map import _default_input_map
from .input.state import _default_input
self._own_input_map = _default_input_map
self._own_input = _default_input
self.root: Node | None = None
self._screen_size: tuple[float, float] = SceneTree._normalize_size(screen_size or (800, 600))
self.paused: bool = False
self._delete_queue: list[Node] = []
self._groups: dict[str, set[Node]] = {}
self._autoloads: dict[str, Node] = {}
# Engine-provided typed event bus, accessible as ``tree.events``. Held
# outside ``_autoloads`` because EventBus is not a Node and does not
# participate in the process/physics traversal -- the per-frame tick
# explicitly calls ``self._events.flush_deferred()``. The name
# ``events`` is reserved (see add_autoload() and project.py).
self._events: EventBus = EventBus()
self._unique_nodes: dict[str, Node] = {}
from .ui.ui_input import UIInputManager # local import avoids cycle via ui/testing.py
self._ui = UIInputManager()
self._current_camera_2d: Camera2D | None = None
self.auto_physics: bool = True # Automatically step PhysicsServer each physics tick
self.overlay_offset: tuple[float, float] = (0.0, 0.0) # 2D offset for Text2D/particle overlays
self.play_viewport_rect: tuple[float, float, float, float] | None = None # (x, y, w, h) to constrain 3D rendering
self._structure_version: int = 0 # Incremented on add_child/remove_child for cache invalidation
self._running: bool = True
self.quit_requested: Signal = Signal()
# -- UI state forwarding (preserves external interface) --
@property
def _focused_control(self):
return self._ui._focused_control
@_focused_control.setter
def _focused_control(self, v):
self._ui._focused_control = v
@property
def _mouse_grab(self) -> Any:
return self._ui._mouse_grab
@_mouse_grab.setter
def _mouse_grab(self, v):
self._ui._mouse_grab = v
@property
def _last_mouse_pos(self):
return self._ui._last_mouse_pos
@_last_mouse_pos.setter
def _last_mouse_pos(self, v):
self._ui._last_mouse_pos = v
@property
def _popup_stack(self) -> list:
return self._ui._popup_stack
@_popup_stack.setter
def _popup_stack(self, v):
self._ui._popup_stack = v
@property
def _shortcut_handler(self):
return self._ui._shortcut_handler
@_shortcut_handler.setter
def _shortcut_handler(self, v):
self._ui._shortcut_handler = v
@staticmethod
def _normalize_size(sz) -> tuple[float, float]:
"""Coerce any screen_size representation to a plain tuple once."""
if isinstance(sz, tuple) and len(sz) == 2:
return sz
if hasattr(sz, 'x'):
return (float(sz.x), float(sz.y))
return (float(sz[0]), float(sz[1]))
[docs]
@property
def is_running(self) -> bool:
"""Whether the tree is actively being ticked by its driving app.
Set to False by :meth:`quit` (or the graphics backend's app ``quit()``),
signalling the main loop to exit at the end of the current frame.
"""
return self._running
[docs]
def quit(self) -> None:
"""Request a clean shutdown of the running tree.
Emits :attr:`quit_requested` and flips :attr:`is_running` to False. The
driving app polls this state and exits its loop at the end of the
current frame. Safe to call from node callbacks or signal handlers.
"""
if not self._running:
return
self._running = False
self.quit_requested()
@property
def screen_size(self) -> tuple[float, float]:
return self._screen_size
[docs]
@screen_size.setter
def screen_size(self, value):
old = self._screen_size
self._screen_size = SceneTree._normalize_size(value)
if self._screen_size != old:
self._invalidate_draw_caches()
[docs]
def set_root(self, root: Node):
"""Set the root node of the scene tree."""
log.debug("SceneTree.set_root(%s)", root)
self.root = root
root._enter_tree(self)
root._ready_recursive()
[docs]
def change_scene(self, new_root: Node):
"""Swap the active root with ``new_root``.
The old root receives ``_exit_tree``; ``new_root`` then runs the full
``_enter_tree`` / ``_ready_recursive`` path, identical to the initial
root. Autoloads are left in place and their groups and unique-name
entries are re-registered on the rebuilt tree. Pending deletes, UI
popup state, and the active 2D camera are cleared.
Use this for title → gameplay → game-over navigation. See
:doc:`../patterns` for a full example.
"""
log.debug("SceneTree.change_scene(%s), old root=%s", new_root, self.root)
if self.root:
self.root._exit_tree()
# Rebuild groups from autoloads only (scene groups were cleared by _exit_tree)
self._groups.clear()
for node in self._autoloads.values():
self._reregister_groups(node)
self._unique_nodes.clear()
for node in self._autoloads.values():
self._reregister_unique(node)
self._delete_queue.clear()
self._ui.reset()
self._current_camera_2d = None
self.set_root(new_root)
[docs]
def process(self, dt: float):
"""Run process callbacks and coroutines on all nodes for one frame.
Order: autoloads' ``_process`` first (Godot-style global singletons),
then ``self.events.flush_deferred()`` so deferred events queued during
the previous frame (process, physics, or input) reach all subscribers
before scene logic runs, then the scene root's ``_process``. Anything
``emit_deferred``'d during this frame's autoload pass is also drained
in the same flush, keeping the scene's view of the world consistent.
"""
with self.activate_input():
from .ui.core import Control
Control._current_frame += 1
for node in self._autoloads.values():
node._process_recursive(dt, self.paused)
self._events.flush_deferred()
if self.root:
self.root._process_recursive(dt, self.paused)
self._flush_deletes()
[docs]
def physics_process(self, dt: float):
"""Run physics_process callbacks on all nodes, then auto-step physics."""
with self.activate_input():
for node in self._autoloads.values():
node._physics_process_recursive(dt, self.paused)
if self.root:
self.root._physics_process_recursive(dt, self.paused)
if self.auto_physics:
from .physics.engine import PhysicsServer
server = PhysicsServer.get()
if server and server.body_count > 0:
server.step(dt)
[docs]
def draw(self, renderer):
cam = self._current_camera_2d
_has = hasattr(renderer, 'push_transform')
if _has and cam is not None:
z = cam.zoom if cam.zoom > 0 else 1.0
sw, sh = self._screen_size
sx, sy = cam._shake_offset
renderer.push_transform(
z, 0.0, 0.0, z,
(-float(cam._current[0]) + float(sx)) * z + sw * 0.5,
(-float(cam._current[1]) + float(sy)) * z + sh * 0.5,
)
if self.root:
self.root._draw_recursive(renderer)
if _has and cam is not None:
renderer.pop_transform()
# Draw popups last (on top of everything, in screen space)
if self._ui._popup_stack:
# Force a new rendering layer so popup fills draw over prior lines
if hasattr(renderer, 'new_layer'):
renderer.new_layer()
# Reset clip to full screen so popups aren't clipped by parent containers
if hasattr(renderer, 'reset_clip'):
renderer.reset_clip()
for popup in self._ui._popup_stack:
popup.draw_popup(renderer)
[docs]
def get_group(self, name: str) -> list[Node]:
"""Get all nodes in a group."""
return list(self._groups.get(name, ()))
# --- Autoloads (singletons that persist across scene changes) ---
[docs]
@property
def autoloads(self) -> dict[str, Node]:
"""Read-only view of registered autoloads."""
return self._autoloads
[docs]
def add_autoload(self, name: str, node: Node):
"""Register ``node`` as a persistent singleton attached to the tree.
The node enters the tree and runs ``ready()`` immediately. Unlike a
regular child, it is not reachable via the scene root — retrieve it via
``tree.autoloads[name]``. Autoloads survive ``change_scene()``, making
them the canonical home for global state (score, settings, audio
manager). See :doc:`../patterns`.
The name ``"events"`` is reserved for the engine-provided
:class:`~simvx.core.event_bus.EventBus`; use ``tree.events`` instead.
"""
if name == RESERVED_AUTOLOAD_EVENTS:
raise ValueError(
"Autoload name 'events' is reserved for the engine-provided "
"EventBus. Access it via tree.events; pick a different name "
"for your autoload."
)
self._autoloads[name] = node
node._enter_tree(self)
node._ready_recursive()
[docs]
def remove_autoload(self, name: str):
"""Unregister and tear down an autoload. Calls ``_exit_tree`` on the node."""
node = self._autoloads.pop(name, None)
if node:
node._exit_tree()
# --- Unique nodes ---
[docs]
def get_unique(self, name: str) -> Node | None:
"""Get a unique node by name. Returns None if not found."""
return self._unique_nodes.get(name)
# --- Internal helpers ---
def _group_add(self, group: str, node: Node):
if group not in self._groups:
self._groups[group] = set()
self._groups[group].add(node)
def _group_remove(self, group: str, node: Node):
if group in self._groups:
self._groups[group].discard(node)
def _reregister_groups(self, node: Node):
"""Re-add a node (and descendants) to the group index."""
for group in node._groups:
self._group_add(group, node)
for child in node.children:
self._reregister_groups(child)
def _reregister_unique(self, node: Node):
"""Re-add a node (and descendants) to the unique-node index."""
if node.unique_name:
self._unique_nodes[node.name] = node
for child in node.children:
self._reregister_unique(child)
def _invalidate_draw_caches(self):
"""Invalidate draw caches of Controls whose rect depends on screen/parent size.
Only anchored controls (any of anchor_{left,top,right,bottom} != 0) have
their absolute rect affected by a screen-size change. Controls with the
default zero anchors have rects computed purely from their own
position/size, so their caches stay valid here — own-size changes are
handled by Property setters and own-position changes by
_invalidate_transform.
Uses an iterative stack to avoid Python recursion overhead on deep trees.
"""
if not self.root:
return
from .ui import Control
stack = [self.root]
while stack:
node = stack.pop()
if isinstance(node, Control) and node._draw_cache is not None:
if (node.anchor_left or node.anchor_top
or node.anchor_right or node.anchor_bottom):
node._draw_dirty = True
node._draw_cache = None
children = node.children
if children:
stack.extend(children)
def _queue_delete(self, node: Node):
self._delete_queue.append(node)
def _flush_deletes(self):
for node in self._delete_queue:
if node.parent:
node.parent.remove_child(node)
self._delete_queue.clear()
def _collect_render_queue(self):
"""Collect all renderable nodes into batches (backend use only).
Returns:
List of RenderBatch objects, sorted by material/blend mode.
Backends call this once per frame to get optimized draw list.
"""
from .render_queue import RenderQueue
queue = RenderQueue()
if self.root:
self._collect_meshes_recursive(self.root, queue)
return queue.sorted_batches
def _collect_meshes_recursive(self, node: Node, queue):
"""Traverse tree collecting MeshInstance3D nodes into batches."""
if isinstance(node, MeshInstance3D) and node.mesh is not None:
queue.add_instance(node.mesh, node.material, node.model_matrix, node.layer if hasattr(node, 'layer') else 0)
for child in node.children:
self._collect_meshes_recursive(child, queue)
# ========================================================================
# UI Input System (delegated to UIInputManager)
# ========================================================================
def _set_focused_control(self, control):
"""Set the focused control (forwarded to UIInputManager)."""
self._ui._set_focused_control(control)
def _update_mouse_over_states(self, mouse_pos):
"""Update mouse_over state for all controls (forwarded to UIInputManager)."""
self._ui._update_mouse_over_states(self.root, mouse_pos)
def _find_control_at_point(self, point):
"""Find topmost control at screen position (forwarded to UIInputManager)."""
return self._ui._find_control_at_point(self.root, point)