"""Node — Base node class with tree hierarchy, groups, and coroutine support."""
import ast
import functools
import inspect
import logging
import textwrap
import warnings
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, ClassVar
from .descriptors import Children, Coroutine, CoroutineHandle, Notification, ProcessMode, Property, Signal
from .events import InputEvent, TreeInputEvent
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from .scene_tree import SceneTree
# Lazy-cached reference for circular import
_ui_Control: type | None = None
def _get_control() -> type:
global _ui_Control
if _ui_Control is None:
from .ui import Control
_ui_Control = Control
return _ui_Control
def _init_calls_super(func) -> bool:
"""Check via AST whether *func* contains a super().__init__(...) call."""
try:
source = textwrap.dedent(inspect.getsource(func))
tree = ast.parse(source)
except (OSError, TypeError, IndentationError):
return True # Cannot inspect — assume user handles super
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
f = node.func
# super().__init__(...) or super(Cls, self).__init__(...)
if isinstance(f, ast.Attribute) and f.attr == "__init__" and isinstance(f.value, ast.Call):
inner = f.value
if isinstance(inner.func, ast.Name) and inner.func.id == "super":
return True
return False
[docs]
class Node:
"""Base node with tree hierarchy, groups, and coroutine support.
Attributes:
name: Unique name within the parent's children. Defaults to the class name.
parent: The parent ``Node``, or ``None`` if this is the root.
children: Ordered collection of child nodes, accessible by name or index.
visible: Whether this node (and its descendants) should be drawn.
process_mode: Controls processing behaviour during pause
(``INHERIT``, ``PAUSABLE``, ``PAUSED_ONLY``, ``ALWAYS``, ``DISABLED``).
script: Optional file path to an attached script.
unique_name: When ``True``, the node is registered in the tree for
fast lookup via ``SceneTree.get_unique_node()``.
Example::
root = Node(name="Root")
child = Node(name="Child")
root.add_child(child)
assert child.parent is root
assert root.children["Child"] is child
"""
_registry: ClassVar[dict[str, type]] = {}
strict_errors: ClassVar[bool] = True # Raise on script errors; set False for release
script_error_raised = Signal() # emits (node, method_name, traceback_str)
# Engine kwargs consumed by Node.__init__ — not forwarded to user __init__ unless explicitly accepted
_NODE_INIT_KWARGS = frozenset({"name"})
[docs]
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
Node._registry[cls.__name__] = cls
# Auto-super: wrap user __init__ that doesn't call super().__init__
if "__init__" not in cls.__dict__:
return # No custom __init__ — nothing to wrap
if cls.__dict__.get("__auto_init__") is False:
return # Opted out
user_init = cls.__dict__["__init__"]
if _init_calls_super(user_init):
return # User handles super() — don't wrap
user_sig = inspect.signature(user_init)
user_params = user_sig.parameters
# Determine which params (beyond self) the user accepts
user_param_names = [n for n in user_params if n != "self"]
has_var_keyword = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in user_params.values())
has_var_positional = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in user_params.values())
# Positional param names (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD), in order
positional_names = [
n for n, p in user_params.items()
if n != "self" and p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
]
# Find the parent __init__ to call (the next in MRO that is not the user's)
parent_init = None
for base in cls.__mro__[1:]:
if "__init__" in base.__dict__:
parent_init = base.__dict__["__init__"]
break
if parent_init is None:
parent_init = Node.__init__
# Inspect parent __init__ signature to know what it accepts.
# follow_wrapped=False gets the actual wrapper signature (not the original
# user function's signature that functools.wraps copies).
try:
parent_sig = inspect.signature(parent_init, follow_wrapped=False)
parent_param_names = {n for n in parent_sig.parameters if n != "self"}
parent_has_var_kw = any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in parent_sig.parameters.values()
)
except (ValueError, TypeError):
parent_param_names = set()
parent_has_var_kw = True # Assume flexible
@functools.wraps(user_init)
def _wrapped_init(self, *args, **all_kwargs):
# Map positional args to their parameter names so we can split by name
user_args = args
if args and not has_var_positional:
for i, val in enumerate(args):
if i < len(positional_names):
pname = positional_names[i]
if pname in all_kwargs:
raise TypeError(f"__init__() got multiple values for argument '{pname}'")
all_kwargs[pname] = val
user_args = () # All positional mapped to kwargs
# Split kwargs: parent_kw goes to parent __init__, user_fwd goes to user's __init__
props = cls.get_properties()
parent_kw = {}
user_fwd = {}
for k, v in all_kwargs.items():
is_user_param = k in user_param_names
is_parent_named = k in parent_param_names # Explicitly named in parent sig
is_prop = k in props
is_engine = k in Node._NODE_INIT_KWARGS
if is_user_param:
user_fwd[k] = v
if is_parent_named or is_prop or is_engine:
parent_kw[k] = v # Also pass to parent (e.g. 'name', properties)
elif is_parent_named or is_prop or is_engine:
parent_kw[k] = v # Known parent/engine/property kwarg
elif has_var_keyword:
user_fwd[k] = v # Unknown kwarg, user accepts **kwargs
elif parent_has_var_kw:
parent_kw[k] = v # Fallback to parent **kwargs (will warn)
else:
raise TypeError(f"{cls.__name__}.__init__() got unexpected keyword argument {k!r}")
# Initialise via parent chain (e.g. Node2D.__init__ -> Node.__init__)
parent_init(self, **parent_kw)
# Forward to user's __init__
if has_var_keyword or has_var_positional:
user_init(self, *user_args, **user_fwd)
elif user_param_names:
forward = {k: v for k, v in user_fwd.items() if k in user_param_names}
user_init(self, *user_args, **forward)
else:
user_init(self)
cls.__init__ = _wrapped_init
def __init__(self, name: str = "", **kwargs):
if name and not isinstance(name, str):
raise TypeError(f"Node name must be a string, got {type(name).__name__}")
self._name = name or type(self).__name__
self.parent: Node | None = None
self.children = Children()
self._tree: SceneTree | None = None
self._coroutines: list[Coroutine] = []
self._groups: set[str] = set()
self._scene_template_path: str | None = None
self.script: str | None = None
self._script_embedded: str | None = None # source stored in scene, importable
self._script_module = None # ModuleType | None — cached loaded module
self._script_original_class: type | None = None # original class before script swap
self._visible: bool = True
self._visible_in_hierarchy: bool = True
self._process_mode: ProcessMode = ProcessMode.INHERIT
self._cached_process_mode: ProcessMode | None = None # cached resolved mode
self.unique_name: bool = False
self._script_error: bool = False
self._outgoing_connections: list = [] # signals connected via this node's bound methods
# Apply Property values passed as kwargs
props = self.get_properties()
for key, val in kwargs.items():
if key in props:
setattr(self, key, val)
else:
warnings.warn(f"{type(self).__name__}: unknown kwarg {key!r}", stacklevel=2)
@property
def name(self) -> str:
return self._name
[docs]
@name.setter
def name(self, value: str):
old = self._name
self._name = value
parent = getattr(self, 'parent', None)
if parent is not None:
names = parent.children._names
if old and names.get(old) is self:
del names[old]
if value:
names[value] = self
@property
def process_mode(self) -> ProcessMode:
return self._process_mode
[docs]
@process_mode.setter
def process_mode(self, value: ProcessMode):
self._process_mode = value
self._invalidate_process_mode_cache()
@property
def visible(self) -> bool:
return self._visible
[docs]
@visible.setter
def visible(self, value: bool):
value = bool(value)
if value == self._visible:
return
self._visible = value
parent_effective = True if self.parent is None else self.parent._visible_in_hierarchy
self._propagate_visibility(parent_effective)
self._notification(Notification.VISIBILITY_CHANGED)
def _propagate_visibility(self, parent_effective: bool) -> None:
"""Update _visible_in_hierarchy for self and descendants.
Propagates the effective visibility down the subtree; prunes branches
whose effective state didn't change so toggles cost O(changed-subtree)
rather than O(whole-subtree).
"""
new_effective = parent_effective and self._visible
if self._visible_in_hierarchy == new_effective:
return
self._visible_in_hierarchy = new_effective
for child in self.children:
child._propagate_visibility(new_effective)
def _invalidate_process_mode_cache(self):
"""Clear cached process mode for this node and descendants that inherit."""
self._cached_process_mode = None
for child in self.children:
if child._process_mode == ProcessMode.INHERIT:
child._invalidate_process_mode_cache()
def _notification(self, what: Notification) -> None:
"""Called when a notification is dispatched. Override to handle."""
[docs]
def reset_error(self) -> None:
"""Clear script error flag to re-enable processing."""
self._script_error = False
def _safe_call(self, method, *args: Any) -> None:
"""Call a lifecycle method with error recovery."""
if self._script_error:
return
try:
method(*args)
except AssertionError:
raise
except Exception:
if Node.strict_errors:
raise
self._script_error = True
import sys
import traceback
tb = traceback.format_exc()
# Always print to stderr so errors are never invisible
print(f"Script error in {self.name}.{method.__name__} — node disabled:\n{tb}", file=sys.stderr)
log.error("Script error in %s.%s — node disabled", self.name, method.__name__)
try:
Node.script_error_raised.emit(self, method.__name__, tb)
except Exception:
# justified: signal-handler errors must not derail error recovery itself
pass
[docs]
def add_child(self, node: Node) -> Node:
"""Add a node as a child, reparenting it if already in a tree.
Args:
node: The node to add. Removed from its current parent first.
"""
if node.parent:
node.parent.remove_child(node)
node.parent = self
self.children._add(node)
node._notification(Notification.PARENTED)
node._invalidate_process_mode_cache()
node._propagate_visibility(self._visible_in_hierarchy)
if hasattr(node, '_invalidate_transform'):
node._invalidate_transform()
if self._tree:
self._tree._structure_version += 1
node._enter_tree(self._tree)
node._ready_recursive()
return node
[docs]
def remove_child(self, node: Node):
"""Remove a child node from this node's children."""
if node in self.children:
if self._tree:
self._tree._structure_version += 1
node._exit_tree()
self.children._remove(node)
node._notification(Notification.UNPARENTED)
node.parent = None
node._invalidate_process_mode_cache()
node._propagate_visibility(True)
[docs]
def reparent(self, new_parent: Node):
"""Remove from current parent and add to new_parent."""
if self.parent:
self.parent.remove_child(self)
new_parent.add_child(self)
[docs]
def get_node(self, path: str) -> Node:
"""Navigate tree by path: 'Child/GrandChild' or '../Sibling'."""
current = self
parts = [p for p in path.split('/') if p]
if path.startswith('/'):
while current.parent:
current = current.parent
# An absolute path may optionally name the root as its first segment
# (e.g. '/Root/Player'). Consume it once so it resolves to root.
if parts and parts[0] == current.name:
parts.pop(0)
for part in parts:
if part == '..':
current = current.parent
if current is None:
raise ValueError("Already at root")
else:
current = current.children[part]
return current
[docs]
def find_child(self, name: str, recursive: bool = False) -> Node | None:
"""Find first child with the given name."""
for c in self.children:
if c.name == name:
return c
if recursive:
found = c.find_child(name, recursive=True)
if found:
return found
return None
[docs]
def find(self, node_type: type, recursive: bool = True) -> Node | None:
"""Find first descendant of type (recursive by default)."""
for child in self.children:
if isinstance(child, node_type):
return child
if recursive:
found = child.find(node_type, recursive=True)
if found:
return found
return None
[docs]
def find_all(self, node_type: type, recursive: bool = True) -> list:
"""Find all descendants of type."""
result = []
for child in self.children:
if isinstance(child, node_type):
result.append(child)
if recursive:
result.extend(child.find_all(node_type))
return result
[docs]
def walk(self, *, include_self: bool = True) -> Iterator[Node]:
"""Iterate this node and all descendants in DFS pre-order."""
if include_self:
yield self
for child in self.children:
yield from child.walk(include_self=True)
[docs]
@property
def path(self) -> str:
if self.parent is None:
return f"/{self.name}"
return f"{self.parent.path}/{self.name}"
# --- Groups ---
[docs]
def add_to_group(self, group: str):
"""Add this node to a named group."""
self._groups.add(group)
if self._tree:
self._tree._group_add(group, self)
[docs]
def remove_from_group(self, group: str):
"""Remove this node from a named group."""
self._groups.discard(group)
if self._tree:
self._tree._group_remove(group, self)
[docs]
def is_in_group(self, group: str) -> bool:
"""Check if this node belongs to a named group."""
return group in self._groups
# --- Lifecycle (override in subclasses) ---
[docs]
def ready(self) -> None:
"""Called once after the node and all its children enter the scene tree.
Override to perform initialisation that requires the scene tree --
finding sibling nodes, connecting signals, spawning children. The
``tree`` property is available. Called after ``enter_tree()`` and
after all children's ``ready()``.
Note:
Fires again if the node is removed and re-added to the tree.
Example::
def ready(self):
self.sprite = self.get_node("Sprite")
self.health_changed.connect(self._update_hud)
"""
[docs]
def enter_tree(self) -> None:
"""Called when the node enters the scene tree, before ``ready()``.
Override for setup that must happen the moment the tree reference
becomes available. Children have not entered yet at this point, so
avoid querying child nodes here -- use ``ready()`` instead.
Example::
def enter_tree(self):
self.add_to_group("enemies")
"""
[docs]
def exit_tree(self) -> None:
"""Called when the node is about to leave the scene tree.
Override to clean up resources, disconnect external signals, or
persist state. Children have already exited by the time this fires
on the parent.
Example::
def exit_tree(self):
self.save_progress()
self.remove_from_group("enemies")
"""
[docs]
def process(self, dt: float) -> None:
"""Called every frame for game logic.
Args:
dt: Seconds elapsed since the previous frame (variable timestep).
Override for movement, AI, animation triggers, or any per-frame
update. Obeys ``process_mode`` -- disabled or paused nodes are
skipped automatically.
Example::
def process(self, dt):
self.position += self.velocity * dt
"""
[docs]
def physics_process(self, dt: float) -> None:
"""Called at a fixed timestep (default 60 Hz) for physics logic.
Args:
dt: Fixed time step in seconds (e.g. 1/60).
Override for deterministic physics updates -- forces, collision
responses, rigid-body integration. Runs independently of the
render frame rate.
Example::
def physics_process(self, dt):
self.velocity += self.gravity * dt
self.move_and_slide()
"""
[docs]
def draw(self, renderer) -> None:
"""Called each frame for custom 2D drawing.
Args:
renderer: The active draw-command recorder (e.g. ``Draw2D``).
Override to issue immediate-mode draw calls such as ``draw_line``,
``draw_rect``, or ``draw_text``. Called only when ``visible`` is
``True``.
Example::
def draw(self, renderer):
renderer.draw_circle(self.world_position, 10, colour=(1, 0, 0, 1))
"""
[docs]
def picked(self, event: InputEvent) -> None:
"""Called when a 3D mouse-pick event hits this node's collision shape.
Args:
event: The input event containing click position, camera ray, etc.
Override to react to direct interaction with this 3D object --
selection, dragging, context menus.
Example::
def picked(self, event):
if event.button == MouseButton.LEFT:
self.selected = True
"""
# --- Coroutine support ---
[docs]
def start_coroutine(self, gen: Coroutine) -> CoroutineHandle:
"""Register a generator coroutine to run each frame. Returns a cancellable handle."""
handle = CoroutineHandle(gen)
self._coroutines.append(handle)
return handle
[docs]
def stop_coroutine(self, gen_or_handle):
"""Stop and remove a running coroutine (accepts generator or CoroutineHandle)."""
if isinstance(gen_or_handle, CoroutineHandle):
gen_or_handle.cancel()
if gen_or_handle in self._coroutines:
self._coroutines.remove(gen_or_handle)
else:
# Legacy: search by generator
for h in self._coroutines:
if isinstance(h, CoroutineHandle) and h._gen is gen_or_handle:
h.cancel()
self._coroutines.remove(h)
return
elif h is gen_or_handle:
self._coroutines.remove(h)
return
def _tick_coroutines(self, dt: float):
if not self._coroutines:
return
finished = []
for item in self._coroutines:
if isinstance(item, CoroutineHandle):
if item.is_cancelled:
finished.append(item)
continue
gen = item._gen
else:
gen = item
try:
next(gen)
except StopIteration:
finished.append(item)
for item in finished:
self._coroutines.remove(item)
# --- Tree internals ---
def _enter_tree(self, tree: SceneTree):
self._tree = tree
# Instantiate declared Child descriptors
declared = getattr(type(self), '_declared_children', None)
if declared:
for attr_name, child_desc in declared.items():
if self.__dict__.get(attr_name) is None:
kwargs = dict(child_desc._kwargs)
if 'name' not in kwargs:
kwargs['name'] = attr_name
node = child_desc._type(*child_desc._args, **kwargs)
self.__dict__[attr_name] = node
self.children._add(node)
node.parent = self
if self.unique_name:
tree._unique_nodes[self.name] = self
for group in self._groups:
tree._group_add(group, self)
self._notification(Notification.ENTER_TREE)
self.enter_tree()
for child in self.children:
child._enter_tree(tree)
def _exit_tree(self):
for child in self.children:
child._exit_tree()
self._notification(Notification.EXIT_TREE)
self.exit_tree()
if self._tree:
if self.unique_name:
self._tree._unique_nodes.pop(self.name, None)
for group in self._groups:
self._tree._group_remove(group, self)
self._tree = None
def _ready_recursive(self):
for child in self.children:
child._ready_recursive()
self._notification(Notification.READY)
self._safe_call(self.ready)
def _effective_process_mode(self) -> ProcessMode:
"""Resolve INHERIT by walking up the tree (cached)."""
cached = self._cached_process_mode
if cached is not None:
return cached
mode = self._process_mode
if mode == ProcessMode.INHERIT:
mode = self.parent._effective_process_mode() if self.parent else ProcessMode.PAUSABLE
self._cached_process_mode = mode
return mode
def _can_process(self, paused: bool) -> bool:
"""Check if this node should process given the tree's pause state."""
mode = self._effective_process_mode()
if mode == ProcessMode.DISABLED:
return False
if mode == ProcessMode.ALWAYS:
return True
if mode == ProcessMode.PAUSED_ONLY:
return paused
# PAUSABLE (or resolved INHERIT → PAUSABLE)
return not paused
def _process_recursive(self, dt: float, paused: bool = False):
if self._script_error:
return
# Inlined _can_process: resolve mode from cache and check pause state
mode = self._cached_process_mode
if mode is None:
mode = self._effective_process_mode()
if mode != ProcessMode.DISABLED and (
mode == ProcessMode.ALWAYS
or (not paused if mode == ProcessMode.PAUSABLE else paused)
):
self._notification(Notification.PROCESS)
try:
self.process(dt)
except AssertionError:
raise
except Exception:
if Node.strict_errors:
raise
self._script_error = True
import sys
import traceback
tb = traceback.format_exc()
print(f"Script error in {self.name}.process — node disabled:\n{tb}", file=sys.stderr)
log.error("Script error in %s.process — node disabled", self.name)
try:
Node.script_error_raised.emit(self, "process", tb)
except Exception:
# justified: signal-handler error during process; node already disabled, recursion continues
pass
return
if self._coroutines:
self._tick_coroutines(dt)
for child in self.children.safe_iter():
child._process_recursive(dt, paused)
def _physics_process_recursive(self, dt: float, paused: bool = False):
if self._script_error:
return
# Inlined _can_process: resolve mode from cache and check pause state
mode = self._cached_process_mode
if mode is None:
mode = self._effective_process_mode()
if mode != ProcessMode.DISABLED and (
mode == ProcessMode.ALWAYS
or (not paused if mode == ProcessMode.PAUSABLE else paused)
):
self._notification(Notification.PHYSICS_PROCESS)
try:
self.physics_process(dt)
except AssertionError:
raise
except Exception:
if Node.strict_errors:
raise
self._script_error = True
import sys
import traceback
tb = traceback.format_exc()
print(f"Script error in {self.name}.physics_process — node disabled:\n{tb}", file=sys.stderr)
log.error("Script error in %s.physics_process — node disabled", self.name)
try:
Node.script_error_raised.emit(self, "physics_process", tb)
except Exception:
# justified: signal-handler error during physics_process; node already disabled, recursion continues
pass
return
for child in self.children.safe_iter():
child._physics_process_recursive(dt, paused)
def _draw_recursive(self, renderer):
if not self.visible:
return
if self._script_error:
for child in self.children.safe_iter():
child._draw_recursive(renderer)
return
self._safe_call(self.draw, renderer)
for child in self.children.safe_iter():
child._draw_recursive(renderer)
def _propagate_input(self, event: TreeInputEvent) -> None:
"""Walk tree front-to-back: children first (reversed), then self.
Calls ``handle_input()`` on each node until ``event.handled`` is set.
Then calls ``unhandled_input()`` on remaining nodes.
"""
for child in reversed(list(self.children)):
child._propagate_input(event)
if not event.handled:
self.handle_input(event)
self.unhandled_input(event)
[docs]
def clear_children(self):
"""Destroy all children of this node."""
for child in list(self.children):
child.destroy()
[docs]
def destroy(self):
"""Schedule this node for removal at the end of the current frame.
Signal connections made through this node's bound methods are
proactively disconnected so emitters stop dispatching to it on the
next emit (Godot 4 behaviour). Lazy weak-ref cleanup in
``Signal.__call__`` covers nodes that are GC'd without ``destroy()``.
"""
for conn in list(self._outgoing_connections):
conn.disconnect()
self._outgoing_connections.clear()
if self._tree:
self._tree._queue_delete(self)
[docs]
@property
def app(self):
"""The App running this node's scene tree. Available after enter_tree()."""
return self._tree.app if self._tree else None
[docs]
@property
def tree(self) -> SceneTree:
"""The SceneTree this node belongs to."""
return self._tree
[docs]
def __getitem__(self, key: str):
"""Shorthand for get_node: ``self["Child/Path"]``."""
return self.get_node(key)
[docs]
@classmethod
def get_properties(cls) -> dict[str, Property]:
"""Return all Property descriptors declared on this node class and its bases."""
return getattr(cls, '__properties__', {})
[docs]
def __repr__(self):
return f"<{type(self).__name__} '{self.name}'>"
# Register Node itself (not covered by __init_subclass__ which only fires for subclasses)
Node._registry["Node"] = Node
# ============================================================================
# Timer
# ============================================================================
[docs]
class Timer(Node):
"""Fires timeout signal after duration. Supports one-shot and repeating."""
duration = Property(1.0, range=(0.001, 3600))
one_shot = Property(True)
autostart = Property(False)
def __init__(self, duration: float = 1.0, one_shot: bool = True,
autostart: bool = False, **kwargs):
super().__init__(**kwargs)
self.duration = duration
self.one_shot = one_shot
self.timeout = Signal()
self._time_left = duration if autostart else 0.0
self._running = autostart
[docs]
def start(self, duration: float = 0):
"""Start or restart the timer, optionally overriding duration."""
if duration > 0:
self.duration = duration
self._time_left = self.duration
self._running = True
[docs]
def stop(self):
"""Stop the timer and reset time_left to zero."""
self._running = False
self._time_left = 0.0
[docs]
@property
def stopped(self) -> bool:
return not self._running
[docs]
@property
def time_left(self) -> float:
return self._time_left
[docs]
def process(self, dt: float):
if not self._running:
return
self._time_left -= dt
if self._time_left <= 0:
self.timeout()
if self.one_shot:
self._running = False
else:
self._time_left += self.duration