Source code for simvx.core.node

"""Node — Base node class with tree hierarchy, groups, and coroutine support."""

import ast
import functools
import inspect
import logging
import textwrap
import warnings
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, ClassVar

from .descriptors import Children, Coroutine, CoroutineHandle, Notification, ProcessMode, Property, Signal
from .events import InputEvent, TreeInputEvent

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from .scene_tree import SceneTree

# Lazy-cached reference for circular import
_ui_Control: type | None = None

def _get_control() -> type:
    global _ui_Control
    if _ui_Control is None:
        from .ui import Control
        _ui_Control = Control
    return _ui_Control

def _init_calls_super(func) -> bool:
    """Check via AST whether *func* contains a super().__init__(...) call."""
    try:
        source = textwrap.dedent(inspect.getsource(func))
        tree = ast.parse(source)
    except (OSError, TypeError, IndentationError):
        return True  # Cannot inspect — assume user handles super
    for node in ast.walk(tree):
        if not isinstance(node, ast.Call):
            continue
        f = node.func
        # super().__init__(...) or super(Cls, self).__init__(...)
        if isinstance(f, ast.Attribute) and f.attr == "__init__" and isinstance(f.value, ast.Call):
            inner = f.value
            if isinstance(inner.func, ast.Name) and inner.func.id == "super":
                return True
    return False

[docs] class Node: """Base node with tree hierarchy, groups, and coroutine support. Attributes: name: Unique name within the parent's children. Defaults to the class name. parent: The parent ``Node``, or ``None`` if this is the root. children: Ordered collection of child nodes, accessible by name or index. visible: Whether this node (and its descendants) should be drawn. process_mode: Controls processing behaviour during pause (``INHERIT``, ``PAUSABLE``, ``PAUSED_ONLY``, ``ALWAYS``, ``DISABLED``). script: Optional file path to an attached script. unique_name: When ``True``, the node is registered in the tree for fast lookup via ``SceneTree.get_unique_node()``. Example:: root = Node(name="Root") child = Node(name="Child") root.add_child(child) assert child.parent is root assert root.children["Child"] is child """ _registry: ClassVar[dict[str, type]] = {} strict_errors: ClassVar[bool] = True # Raise on script errors; set False for release script_error_raised = Signal() # emits (node, method_name, traceback_str) # Engine kwargs consumed by Node.__init__ — not forwarded to user __init__ unless explicitly accepted _NODE_INIT_KWARGS = frozenset({"name"})
[docs] def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) Node._registry[cls.__name__] = cls # Auto-super: wrap user __init__ that doesn't call super().__init__ if "__init__" not in cls.__dict__: return # No custom __init__ — nothing to wrap if cls.__dict__.get("__auto_init__") is False: return # Opted out user_init = cls.__dict__["__init__"] if _init_calls_super(user_init): return # User handles super() — don't wrap user_sig = inspect.signature(user_init) user_params = user_sig.parameters # Determine which params (beyond self) the user accepts user_param_names = [n for n in user_params if n != "self"] has_var_keyword = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in user_params.values()) has_var_positional = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in user_params.values()) # Positional param names (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD), in order positional_names = [ n for n, p in user_params.items() if n != "self" and p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD) ] # Find the parent __init__ to call (the next in MRO that is not the user's) parent_init = None for base in cls.__mro__[1:]: if "__init__" in base.__dict__: parent_init = base.__dict__["__init__"] break if parent_init is None: parent_init = Node.__init__ # Inspect parent __init__ signature to know what it accepts. # follow_wrapped=False gets the actual wrapper signature (not the original # user function's signature that functools.wraps copies). try: parent_sig = inspect.signature(parent_init, follow_wrapped=False) parent_param_names = {n for n in parent_sig.parameters if n != "self"} parent_has_var_kw = any( p.kind == inspect.Parameter.VAR_KEYWORD for p in parent_sig.parameters.values() ) except (ValueError, TypeError): parent_param_names = set() parent_has_var_kw = True # Assume flexible @functools.wraps(user_init) def _wrapped_init(self, *args, **all_kwargs): # Map positional args to their parameter names so we can split by name user_args = args if args and not has_var_positional: for i, val in enumerate(args): if i < len(positional_names): pname = positional_names[i] if pname in all_kwargs: raise TypeError(f"__init__() got multiple values for argument '{pname}'") all_kwargs[pname] = val user_args = () # All positional mapped to kwargs # Split kwargs: parent_kw goes to parent __init__, user_fwd goes to user's __init__ props = cls.get_properties() parent_kw = {} user_fwd = {} for k, v in all_kwargs.items(): is_user_param = k in user_param_names is_parent_named = k in parent_param_names # Explicitly named in parent sig is_prop = k in props is_engine = k in Node._NODE_INIT_KWARGS if is_user_param: user_fwd[k] = v if is_parent_named or is_prop or is_engine: parent_kw[k] = v # Also pass to parent (e.g. 'name', properties) elif is_parent_named or is_prop or is_engine: parent_kw[k] = v # Known parent/engine/property kwarg elif has_var_keyword: user_fwd[k] = v # Unknown kwarg, user accepts **kwargs elif parent_has_var_kw: parent_kw[k] = v # Fallback to parent **kwargs (will warn) else: raise TypeError(f"{cls.__name__}.__init__() got unexpected keyword argument {k!r}") # Initialise via parent chain (e.g. Node2D.__init__ -> Node.__init__) parent_init(self, **parent_kw) # Forward to user's __init__ if has_var_keyword or has_var_positional: user_init(self, *user_args, **user_fwd) elif user_param_names: forward = {k: v for k, v in user_fwd.items() if k in user_param_names} user_init(self, *user_args, **forward) else: user_init(self) cls.__init__ = _wrapped_init
def __init__(self, name: str = "", **kwargs): if name and not isinstance(name, str): raise TypeError(f"Node name must be a string, got {type(name).__name__}") self._name = name or type(self).__name__ self.parent: Node | None = None self.children = Children() self._tree: SceneTree | None = None self._coroutines: list[Coroutine] = [] self._groups: set[str] = set() self._scene_template_path: str | None = None self.script: str | None = None self._script_embedded: str | None = None # source stored in scene, importable self._script_module = None # ModuleType | None — cached loaded module self._script_original_class: type | None = None # original class before script swap self._visible: bool = True self._visible_in_hierarchy: bool = True self._process_mode: ProcessMode = ProcessMode.INHERIT self._cached_process_mode: ProcessMode | None = None # cached resolved mode self.unique_name: bool = False self._script_error: bool = False self._outgoing_connections: list = [] # signals connected via this node's bound methods # Apply Property values passed as kwargs props = self.get_properties() for key, val in kwargs.items(): if key in props: setattr(self, key, val) else: warnings.warn(f"{type(self).__name__}: unknown kwarg {key!r}", stacklevel=2) @property def name(self) -> str: return self._name
[docs] @name.setter def name(self, value: str): old = self._name self._name = value parent = getattr(self, 'parent', None) if parent is not None: names = parent.children._names if old and names.get(old) is self: del names[old] if value: names[value] = self
@property def process_mode(self) -> ProcessMode: return self._process_mode
[docs] @process_mode.setter def process_mode(self, value: ProcessMode): self._process_mode = value self._invalidate_process_mode_cache()
@property def visible(self) -> bool: return self._visible
[docs] @visible.setter def visible(self, value: bool): value = bool(value) if value == self._visible: return self._visible = value parent_effective = True if self.parent is None else self.parent._visible_in_hierarchy self._propagate_visibility(parent_effective) self._notification(Notification.VISIBILITY_CHANGED)
def _propagate_visibility(self, parent_effective: bool) -> None: """Update _visible_in_hierarchy for self and descendants. Propagates the effective visibility down the subtree; prunes branches whose effective state didn't change so toggles cost O(changed-subtree) rather than O(whole-subtree). """ new_effective = parent_effective and self._visible if self._visible_in_hierarchy == new_effective: return self._visible_in_hierarchy = new_effective for child in self.children: child._propagate_visibility(new_effective) def _invalidate_process_mode_cache(self): """Clear cached process mode for this node and descendants that inherit.""" self._cached_process_mode = None for child in self.children: if child._process_mode == ProcessMode.INHERIT: child._invalidate_process_mode_cache() def _notification(self, what: Notification) -> None: """Called when a notification is dispatched. Override to handle."""
[docs] def reset_error(self) -> None: """Clear script error flag to re-enable processing.""" self._script_error = False
def _safe_call(self, method, *args: Any) -> None: """Call a lifecycle method with error recovery.""" if self._script_error: return try: method(*args) except AssertionError: raise except Exception: if Node.strict_errors: raise self._script_error = True import sys import traceback tb = traceback.format_exc() # Always print to stderr so errors are never invisible print(f"Script error in {self.name}.{method.__name__} — node disabled:\n{tb}", file=sys.stderr) log.error("Script error in %s.%s — node disabled", self.name, method.__name__) try: Node.script_error_raised.emit(self, method.__name__, tb) except Exception: # justified: signal-handler errors must not derail error recovery itself pass
[docs] def add_child(self, node: Node) -> Node: """Add a node as a child, reparenting it if already in a tree. Args: node: The node to add. Removed from its current parent first. """ if node.parent: node.parent.remove_child(node) node.parent = self self.children._add(node) node._notification(Notification.PARENTED) node._invalidate_process_mode_cache() node._propagate_visibility(self._visible_in_hierarchy) if hasattr(node, '_invalidate_transform'): node._invalidate_transform() if self._tree: self._tree._structure_version += 1 node._enter_tree(self._tree) node._ready_recursive() return node
[docs] def remove_child(self, node: Node): """Remove a child node from this node's children.""" if node in self.children: if self._tree: self._tree._structure_version += 1 node._exit_tree() self.children._remove(node) node._notification(Notification.UNPARENTED) node.parent = None node._invalidate_process_mode_cache() node._propagate_visibility(True)
[docs] def reparent(self, new_parent: Node): """Remove from current parent and add to new_parent.""" if self.parent: self.parent.remove_child(self) new_parent.add_child(self)
[docs] def get_node(self, path: str) -> Node: """Navigate tree by path: 'Child/GrandChild' or '../Sibling'.""" current = self parts = [p for p in path.split('/') if p] if path.startswith('/'): while current.parent: current = current.parent # An absolute path may optionally name the root as its first segment # (e.g. '/Root/Player'). Consume it once so it resolves to root. if parts and parts[0] == current.name: parts.pop(0) for part in parts: if part == '..': current = current.parent if current is None: raise ValueError("Already at root") else: current = current.children[part] return current
[docs] def find_child(self, name: str, recursive: bool = False) -> Node | None: """Find first child with the given name.""" for c in self.children: if c.name == name: return c if recursive: found = c.find_child(name, recursive=True) if found: return found return None
[docs] def find(self, node_type: type, recursive: bool = True) -> Node | None: """Find first descendant of type (recursive by default).""" for child in self.children: if isinstance(child, node_type): return child if recursive: found = child.find(node_type, recursive=True) if found: return found return None
[docs] def find_all(self, node_type: type, recursive: bool = True) -> list: """Find all descendants of type.""" result = [] for child in self.children: if isinstance(child, node_type): result.append(child) if recursive: result.extend(child.find_all(node_type)) return result
[docs] def walk(self, *, include_self: bool = True) -> Iterator[Node]: """Iterate this node and all descendants in DFS pre-order.""" if include_self: yield self for child in self.children: yield from child.walk(include_self=True)
[docs] @property def path(self) -> str: if self.parent is None: return f"/{self.name}" return f"{self.parent.path}/{self.name}"
# --- Groups ---
[docs] def add_to_group(self, group: str): """Add this node to a named group.""" self._groups.add(group) if self._tree: self._tree._group_add(group, self)
[docs] def remove_from_group(self, group: str): """Remove this node from a named group.""" self._groups.discard(group) if self._tree: self._tree._group_remove(group, self)
[docs] def is_in_group(self, group: str) -> bool: """Check if this node belongs to a named group.""" return group in self._groups
# --- Lifecycle (override in subclasses) ---
[docs] def ready(self) -> None: """Called once after the node and all its children enter the scene tree. Override to perform initialisation that requires the scene tree -- finding sibling nodes, connecting signals, spawning children. The ``tree`` property is available. Called after ``enter_tree()`` and after all children's ``ready()``. Note: Fires again if the node is removed and re-added to the tree. Example:: def ready(self): self.sprite = self.get_node("Sprite") self.health_changed.connect(self._update_hud) """
[docs] def enter_tree(self) -> None: """Called when the node enters the scene tree, before ``ready()``. Override for setup that must happen the moment the tree reference becomes available. Children have not entered yet at this point, so avoid querying child nodes here -- use ``ready()`` instead. Example:: def enter_tree(self): self.add_to_group("enemies") """
[docs] def exit_tree(self) -> None: """Called when the node is about to leave the scene tree. Override to clean up resources, disconnect external signals, or persist state. Children have already exited by the time this fires on the parent. Example:: def exit_tree(self): self.save_progress() self.remove_from_group("enemies") """
[docs] def process(self, dt: float) -> None: """Called every frame for game logic. Args: dt: Seconds elapsed since the previous frame (variable timestep). Override for movement, AI, animation triggers, or any per-frame update. Obeys ``process_mode`` -- disabled or paused nodes are skipped automatically. Example:: def process(self, dt): self.position += self.velocity * dt """
[docs] def physics_process(self, dt: float) -> None: """Called at a fixed timestep (default 60 Hz) for physics logic. Args: dt: Fixed time step in seconds (e.g. 1/60). Override for deterministic physics updates -- forces, collision responses, rigid-body integration. Runs independently of the render frame rate. Example:: def physics_process(self, dt): self.velocity += self.gravity * dt self.move_and_slide() """
[docs] def draw(self, renderer) -> None: """Called each frame for custom 2D drawing. Args: renderer: The active draw-command recorder (e.g. ``Draw2D``). Override to issue immediate-mode draw calls such as ``draw_line``, ``draw_rect``, or ``draw_text``. Called only when ``visible`` is ``True``. Example:: def draw(self, renderer): renderer.draw_circle(self.world_position, 10, colour=(1, 0, 0, 1)) """
[docs] def picked(self, event: InputEvent) -> None: """Called when a 3D mouse-pick event hits this node's collision shape. Args: event: The input event containing click position, camera ray, etc. Override to react to direct interaction with this 3D object -- selection, dragging, context menus. Example:: def picked(self, event): if event.button == MouseButton.LEFT: self.selected = True """
[docs] def handle_input(self, event: TreeInputEvent) -> None: """Called for input events propagated through the scene tree. Args: event: A tree-level input event. Set ``event.handled = True`` to stop propagation. Events travel from leaf nodes to root (front-to-back). If any node marks the event as handled, subsequent nodes receive it only via ``unhandled_input()``. Example:: def handle_input(self, event): if event.key == "escape": self.pause_menu.show() event.handled = True """
[docs] def unhandled_input(self, event: TreeInputEvent) -> None: """Called for input events that no other node has handled. Args: event: The unhandled tree-level input event. Use for catch-all bindings such as global shortcuts or debug toggles that should only fire when no UI element consumed the event. Example:: def unhandled_input(self, event): if event.key == "f3": self.toggle_debug_overlay() """
# --- Coroutine support ---
[docs] def start_coroutine(self, gen: Coroutine) -> CoroutineHandle: """Register a generator coroutine to run each frame. Returns a cancellable handle.""" handle = CoroutineHandle(gen) self._coroutines.append(handle) return handle
[docs] def stop_coroutine(self, gen_or_handle): """Stop and remove a running coroutine (accepts generator or CoroutineHandle).""" if isinstance(gen_or_handle, CoroutineHandle): gen_or_handle.cancel() if gen_or_handle in self._coroutines: self._coroutines.remove(gen_or_handle) else: # Legacy: search by generator for h in self._coroutines: if isinstance(h, CoroutineHandle) and h._gen is gen_or_handle: h.cancel() self._coroutines.remove(h) return elif h is gen_or_handle: self._coroutines.remove(h) return
def _tick_coroutines(self, dt: float): if not self._coroutines: return finished = [] for item in self._coroutines: if isinstance(item, CoroutineHandle): if item.is_cancelled: finished.append(item) continue gen = item._gen else: gen = item try: next(gen) except StopIteration: finished.append(item) for item in finished: self._coroutines.remove(item) # --- Tree internals --- def _enter_tree(self, tree: SceneTree): self._tree = tree # Instantiate declared Child descriptors declared = getattr(type(self), '_declared_children', None) if declared: for attr_name, child_desc in declared.items(): if self.__dict__.get(attr_name) is None: kwargs = dict(child_desc._kwargs) if 'name' not in kwargs: kwargs['name'] = attr_name node = child_desc._type(*child_desc._args, **kwargs) self.__dict__[attr_name] = node self.children._add(node) node.parent = self if self.unique_name: tree._unique_nodes[self.name] = self for group in self._groups: tree._group_add(group, self) self._notification(Notification.ENTER_TREE) self.enter_tree() for child in self.children: child._enter_tree(tree) def _exit_tree(self): for child in self.children: child._exit_tree() self._notification(Notification.EXIT_TREE) self.exit_tree() if self._tree: if self.unique_name: self._tree._unique_nodes.pop(self.name, None) for group in self._groups: self._tree._group_remove(group, self) self._tree = None def _ready_recursive(self): for child in self.children: child._ready_recursive() self._notification(Notification.READY) self._safe_call(self.ready) def _effective_process_mode(self) -> ProcessMode: """Resolve INHERIT by walking up the tree (cached).""" cached = self._cached_process_mode if cached is not None: return cached mode = self._process_mode if mode == ProcessMode.INHERIT: mode = self.parent._effective_process_mode() if self.parent else ProcessMode.PAUSABLE self._cached_process_mode = mode return mode def _can_process(self, paused: bool) -> bool: """Check if this node should process given the tree's pause state.""" mode = self._effective_process_mode() if mode == ProcessMode.DISABLED: return False if mode == ProcessMode.ALWAYS: return True if mode == ProcessMode.PAUSED_ONLY: return paused # PAUSABLE (or resolved INHERIT → PAUSABLE) return not paused def _process_recursive(self, dt: float, paused: bool = False): if self._script_error: return # Inlined _can_process: resolve mode from cache and check pause state mode = self._cached_process_mode if mode is None: mode = self._effective_process_mode() if mode != ProcessMode.DISABLED and ( mode == ProcessMode.ALWAYS or (not paused if mode == ProcessMode.PAUSABLE else paused) ): self._notification(Notification.PROCESS) try: self.process(dt) except AssertionError: raise except Exception: if Node.strict_errors: raise self._script_error = True import sys import traceback tb = traceback.format_exc() print(f"Script error in {self.name}.process — node disabled:\n{tb}", file=sys.stderr) log.error("Script error in %s.process — node disabled", self.name) try: Node.script_error_raised.emit(self, "process", tb) except Exception: # justified: signal-handler error during process; node already disabled, recursion continues pass return if self._coroutines: self._tick_coroutines(dt) for child in self.children.safe_iter(): child._process_recursive(dt, paused) def _physics_process_recursive(self, dt: float, paused: bool = False): if self._script_error: return # Inlined _can_process: resolve mode from cache and check pause state mode = self._cached_process_mode if mode is None: mode = self._effective_process_mode() if mode != ProcessMode.DISABLED and ( mode == ProcessMode.ALWAYS or (not paused if mode == ProcessMode.PAUSABLE else paused) ): self._notification(Notification.PHYSICS_PROCESS) try: self.physics_process(dt) except AssertionError: raise except Exception: if Node.strict_errors: raise self._script_error = True import sys import traceback tb = traceback.format_exc() print(f"Script error in {self.name}.physics_process — node disabled:\n{tb}", file=sys.stderr) log.error("Script error in %s.physics_process — node disabled", self.name) try: Node.script_error_raised.emit(self, "physics_process", tb) except Exception: # justified: signal-handler error during physics_process; node already disabled, recursion continues pass return for child in self.children.safe_iter(): child._physics_process_recursive(dt, paused) def _draw_recursive(self, renderer): if not self.visible: return if self._script_error: for child in self.children.safe_iter(): child._draw_recursive(renderer) return self._safe_call(self.draw, renderer) for child in self.children.safe_iter(): child._draw_recursive(renderer) def _propagate_input(self, event: TreeInputEvent) -> None: """Walk tree front-to-back: children first (reversed), then self. Calls ``handle_input()`` on each node until ``event.handled`` is set. Then calls ``unhandled_input()`` on remaining nodes. """ for child in reversed(list(self.children)): child._propagate_input(event) if not event.handled: self.handle_input(event) self.unhandled_input(event)
[docs] def clear_children(self): """Destroy all children of this node.""" for child in list(self.children): child.destroy()
[docs] def destroy(self): """Schedule this node for removal at the end of the current frame. Signal connections made through this node's bound methods are proactively disconnected so emitters stop dispatching to it on the next emit (Godot 4 behaviour). Lazy weak-ref cleanup in ``Signal.__call__`` covers nodes that are GC'd without ``destroy()``. """ for conn in list(self._outgoing_connections): conn.disconnect() self._outgoing_connections.clear() if self._tree: self._tree._queue_delete(self)
[docs] @property def app(self): """The App running this node's scene tree. Available after enter_tree().""" return self._tree.app if self._tree else None
[docs] @property def tree(self) -> SceneTree: """The SceneTree this node belongs to.""" return self._tree
[docs] def __getitem__(self, key: str): """Shorthand for get_node: ``self["Child/Path"]``.""" return self.get_node(key)
[docs] @classmethod def get_properties(cls) -> dict[str, Property]: """Return all Property descriptors declared on this node class and its bases.""" return getattr(cls, '__properties__', {})
[docs] def __repr__(self): return f"<{type(self).__name__} '{self.name}'>"
# Register Node itself (not covered by __init_subclass__ which only fires for subclasses) Node._registry["Node"] = Node # ============================================================================ # Timer # ============================================================================
[docs] class Timer(Node): """Fires timeout signal after duration. Supports one-shot and repeating.""" duration = Property(1.0, range=(0.001, 3600)) one_shot = Property(True) autostart = Property(False) def __init__(self, duration: float = 1.0, one_shot: bool = True, autostart: bool = False, **kwargs): super().__init__(**kwargs) self.duration = duration self.one_shot = one_shot self.timeout = Signal() self._time_left = duration if autostart else 0.0 self._running = autostart
[docs] def start(self, duration: float = 0): """Start or restart the timer, optionally overriding duration.""" if duration > 0: self.duration = duration self._time_left = self.duration self._running = True
[docs] def stop(self): """Stop the timer and reset time_left to zero.""" self._running = False self._time_left = 0.0
[docs] @property def stopped(self) -> bool: return not self._running
[docs] @property def time_left(self) -> float: return self._time_left
[docs] def process(self, dt: float): if not self._running: return self._time_left -= dt if self._time_left <= 0: self.timeout() if self.one_shot: self._running = False else: self._time_left += self.duration