Source code for simvx.core.scene_io.scene_module

"""Folder-as-scene editing surface.

This is Tier 4 of the scene I/O layer. A *scene module* is a Python package
whose ``__init__.py`` (or, as a namespaced fallback, ``<folder>/<folder>.py``)
defines the primary :class:`Node` subclass; sibling ``.py`` files inside the
folder hold sub-scenes that the root imports relatively. ``SceneModule`` is
the multi-file analog of :class:`SceneFile`: it owns one root ``SceneFile``
plus zero or more sibling ``SceneFile`` instances, all editable, with
:meth:`save` flushing every dirty file in one pass.
"""

from __future__ import annotations

from pathlib import Path

from .detection import primary_node_class_from_source
from .scene_file import SceneClass, SceneFile

__all__ = ["NotASceneModuleError", "SceneModule"]


[docs] class NotASceneModuleError(ValueError): """Raised when a folder doesn't qualify as a scene module."""
# --------------------------------------------------------------------------- # SceneModule # ---------------------------------------------------------------------------
[docs] class SceneModule: """A folder scene: a Python package whose ``__init__.py`` (or a ``<folder>/<folder_name>.py`` namespaced fallback) defines the primary Node subclass, with sibling ``.py`` files holding sub-scenes that the root imports. SceneModule is the multi-file analog of SceneFile: it owns one root SceneFile plus zero or more sibling SceneFiles, all editable, with save() flushing every dirty file. """ __slots__ = ("_folder", "_root_path", "_root", "_files", "_to_delete", "_added") def __init__(self, folder: Path, root_path: Path, root: SceneFile) -> None: self._folder = folder self._root_path = root_path self._root = root # Cache keyed by file stem. The root's stem is included so that # `mod.file(<root_stem>)` returns the same SceneFile as `mod.root`. self._files: dict[str, SceneFile] = {root_path.stem: root} # Files marked for deletion on save() (stems only). self._to_delete: set[str] = set() # Files staged in-memory via add_file() that don't yet exist on disk. self._added: set[str] = set() # -- constructors --------------------------------------------------------
[docs] @classmethod def load(cls, folder: str | Path) -> SceneModule: """Open a folder scene. Resolution order: 1. ``folder/__init__.py`` if it defines exactly one Node subclass. 2. ``folder/<folder_name>.py`` as a namespaced fallback. Raises :class:`NotASceneModuleError` if neither matches. Raises :class:`AmbiguousSceneError` if either matching file has multiple Node subclasses. """ folder_path = Path(folder) if not folder_path.is_dir(): raise NotASceneModuleError(f"{folder_path} is not a directory") init = folder_path / "__init__.py" namespaced = folder_path / f"{folder_path.name}.py" chosen: Path | None = None if init.is_file(): text = init.read_text() # primary_node_class_from_source raises AmbiguousSceneError on # multiple matches and returns None for zero. if primary_node_class_from_source(text, path=init) is not None: chosen = init if chosen is None and namespaced.is_file(): text = namespaced.read_text() if primary_node_class_from_source(text, path=namespaced) is not None: chosen = namespaced if chosen is None: raise NotASceneModuleError( f"{folder_path} is not a scene module: neither {init} nor {namespaced} " "defines a Node subclass" ) root = SceneFile.load(chosen) return cls(folder_path, chosen, root)
[docs] @staticmethod def is_folder_scene(path: str | Path) -> bool: """True iff ``path`` qualifies as a scene module under the same rules as :meth:`load`. False for files, missing paths, and folders that don't match either resolution rule. """ p = Path(path) if not p.is_dir(): return False init = p / "__init__.py" if init.is_file() and _file_qualifies(init): return True namespaced = p / f"{p.name}.py" if namespaced.is_file() and _file_qualifies(namespaced): return True return False
# -- properties ----------------------------------------------------------
[docs] @property def folder(self) -> Path: """The folder path.""" return self._folder
[docs] @property def root(self) -> SceneFile: """SceneFile for the root scene definition (whichever of ``__init__.py`` or ``<folder_name>.py`` was selected). """ return self._root
[docs] @property def root_path(self) -> Path: """Path to the root file selected by the resolver.""" return self._root_path
# -- file management -----------------------------------------------------
[docs] def files(self) -> list[SceneFile]: """Every ``.py`` SceneFile open for editing under this module (the root plus any opened sub-scenes), in stable order. """ return [self._files[stem] for stem in sorted(self._files)]
[docs] def file(self, name: str) -> SceneFile: """Open (or return cached) the sibling file ``<name>.py`` as a SceneFile. ``name`` is the module stem, not a full path. Raises :class:`FileNotFoundError` if the file doesn't exist. """ if name in self._files: return self._files[name] path = self._folder / f"{name}.py" if not path.is_file(): raise FileNotFoundError(f"{path} not found in scene module {self._folder}") sf = SceneFile.load(path) self._files[name] = sf return sf
[docs] def add_file(self, name: str, source: str) -> SceneFile: """Create a new sibling file ``<name>.py`` with the given source. Returns the cached SceneFile. Raises :class:`FileExistsError` if the file already exists. The file is staged in memory until :meth:`save`. """ path = self._folder / f"{name}.py" if path.exists(): raise FileExistsError(f"{path} already exists") if name in self._files: raise FileExistsError(f"{name}.py is already staged in this SceneModule") sf = SceneFile.from_source(source, path=path) self._files[name] = sf self._added.add(name) # If a previous remove_file() targeted this stem, cancel it. self._to_delete.discard(name) return sf
[docs] def remove_file(self, name: str) -> None: """Mark a sibling file for deletion on :meth:`save`. Raises :class:`FileNotFoundError` if the file doesn't exist on disk and wasn't staged via :meth:`add_file`. The corresponding SceneFile cache entry is dropped immediately. """ if name == self._root_path.stem: raise ValueError(f"cannot remove root scene file {name!r}") path = self._folder / f"{name}.py" on_disk = path.is_file() staged = name in self._added cached = name in self._files if not on_disk and not staged and not cached: raise FileNotFoundError(f"{path} not found in scene module {self._folder}") # Drop cache immediately. self._files.pop(name, None) if staged: # Was added in-memory only; just discard. self._added.discard(name) return self._to_delete.add(name)
# -- split / inline ------------------------------------------------------
[docs] def split_child(self, child_var: str, into_file: str) -> SceneFile: """Extract the child constructed by ``<child_var>`` in the root's ``__init__`` into a sibling file ``<into_file>.py``. The new file contains a Node subclass whose body matches the original child construction *plus* any subsequent ``__init__`` statements that reference ``<child_var>`` (e.g. ``<child_var>.add_child(Weapon())`` or ``<child_var>.scale = ...``). The root's child line becomes ``<child_var> = <NewClass>()`` and a relative import is added; the lifted statements are removed from the root's ``__init__``. Refuses (raises :class:`ValueError`) if any lift candidate: - lives inside a loop / conditional / try / with block - depends on a helper variable assigned earlier in ``__init__`` (the user must inline the helper into the lifted construction first, or restructure manually). Returns the new SceneFile. Raises if ``<child_var>`` is not in the root, or if ``<into_file>.py`` already exists. """ sc = self._root.scene_class() if not sc.has_child(child_var): raise ValueError(f"child variable {child_var!r} not found in root {sc.name}") if (self._folder / f"{into_file}.py").exists() or into_file in self._files: raise FileExistsError(f"{into_file}.py already exists in {self._folder}") type_name, kwargs = self._read_child_construction(sc, child_var) new_class_name = _stem_to_class_name(into_file) # Collect lift candidates: every __init__ statement after the # var's assignment that references <child_var>. Refuses if any # candidate lives in a control-flow construct or depends on # an unsupplied helper var. lift_blocks = self._collect_lift_statements(sc, child_var) # Build the new file's source. Subclass the original type and # apply kwargs in __init__ via super().__init__, then re-emit # the lifted statements with `<child_var>.` rewritten to `self.`. new_source = _build_subclass_source( new_class_name, type_name, kwargs, lifted_body=_lifted_body_from_blocks(lift_blocks, child_var), ) new_file = self.add_file(into_file, new_source) # Replace the inline construction: <child_var> = <NewClass>() self._replace_child_construction(sc, child_var, new_class_name) # Remove the lifted statements from the root's __init__. self._remove_lifted_statements(sc, lift_blocks) # Add the relative import from the root. self._root.imports.ensure(new_class_name, from_=f".{into_file}") return new_file
[docs] def inline_file(self, child_path: str) -> None: """Inverse of :meth:`split_child`. Inline the named sub-scene back into the root: replace the ``<var> = <CustomClass>()`` line with a construction call (and child kwargs) matching the inlined class, remove the relative import, and :meth:`remove_file` the named file. Raises if the named class is not currently used by the root, or if the file is not a clean leaf scene module (e.g. has procedural construction). """ target_file = self.file(child_path) target_class = target_file.scene_class() new_class_name = target_class.name # Find the inline `<var> = NewClass()` construction in the root. sc = self._root.scene_class() var_name = self._find_child_using_class(sc, new_class_name) if var_name is None: raise ValueError(f"class {new_class_name!r} from {child_path}.py is not used by the root scene") # Recover the original (super_type, kwargs) from the subclass body. super_type, kwargs = self._extract_subclass_construction(target_class) if super_type is None: raise ValueError( f"cannot inline {child_path}.py: {new_class_name!r} is not a clean leaf " "subclass (no super().__init__ or unsupported construction)" ) # Rewrite the construction line: <var> = <super_type>(<kwargs>). self._replace_child_construction_full(sc, var_name, super_type, kwargs) # Drop the relative import, ensure the super_type is imported. self._root.imports.remove(new_class_name, from_=f".{child_path}") self._root.imports.ensure(super_type, from_="simvx.core") # Mark the file for deletion. self.remove_file(child_path)
# -- save / dirty --------------------------------------------------------
[docs] def save(self) -> list[Path]: """Write every dirty/added/removed file. Returns the list of paths written or removed (in stable order). Idempotent: a SceneModule with no edits writes nothing and returns ``[]``. """ written: list[Path] = [] # Write all dirty SceneFiles first (stable order by stem). for stem in sorted(self._files): sf = self._files[stem] if stem in self._added or sf.is_dirty(): if sf.path is None: raise ValueError(f"SceneFile for {stem!r} has no path") sf.save() written.append(sf.path) # Reset added bookkeeping; on subsequent saves these are normal files. self._added.clear() # Now process deletions. for stem in sorted(self._to_delete): path = self._folder / f"{stem}.py" if path.is_file(): path.unlink() written.append(path) self._to_delete.clear() return written
# -- diagnostics ---------------------------------------------------------
[docs] def assert_idempotent(self) -> None: """Test helper: assert ``root`` and every cached file dump unchanged (no edits). """ for stem in sorted(self._files): sf = self._files[stem] if stem in self._added: raise AssertionError(f"SceneModule has staged file {stem!r} not yet saved") sf.assert_idempotent() if self._to_delete: raise AssertionError(f"SceneModule has pending deletions: {sorted(self._to_delete)}")
# -- internal helpers ---------------------------------------------------- def _collect_lift_statements( self, sc: SceneClass, child_var: str ) -> list: """Return the list of parso simple_stmt nodes that should be lifted. Lift candidates are statements that: - appear after ``<child_var>``'s assignment in the root's ``__init__`` body, - syntactically reference ``<child_var>`` as the head name of an attribute access (``<child_var>.add_child(...)`` etc.), - sit at suite top-level (NOT inside a loop / conditional / try / with), - reference no other helper variable assigned in the same ``__init__`` (the lift would otherwise be incomplete; we refuse rather than guess what to do with the helper). ``self.add_child(<child_var>)`` is *not* a lift candidate — that's the parent scene's responsibility, kept as-is. """ from parso.tree import Leaf suite = sc._init_suite() statements = list(suite.children) # Locate the child's assignment statement. assignment_stmt = None for i, stmt in enumerate(statements): if stmt.type != "simple_stmt": continue inner = stmt.children[0] if stmt.children else None if inner is None or inner.type != "expr_stmt": continue target = inner.children[0] if isinstance(target, Leaf) and target.type == "name" and target.value == child_var: assignment_stmt = (i, stmt) break if assignment_stmt is None: return [] start_idx = assignment_stmt[0] # Local-var names assigned anywhere in the suite. Lifted statements # must not reference any of these (other than the child_var itself # and `self`). local_var_names: set[str] = {child_var} for stmt in statements: if stmt.type != "simple_stmt": continue inner = stmt.children[0] if stmt.children else None if inner is None or inner.type != "expr_stmt": continue tgt = inner.children[0] if isinstance(tgt, Leaf) and tgt.type == "name": local_var_names.add(tgt.value) lift_candidates: list = [] for i, stmt in enumerate(statements): if i <= start_idx: continue if stmt.type != "simple_stmt": # Compound statements (if/for/while/try/with) at top-level # only matter if they actually touch child_var. Refuse on # any reference inside a control-flow block. if _stmt_references_name(stmt, child_var): raise ValueError( f"split_child: cannot lift statement at line " f"{stmt.get_first_leaf().start_pos[0]}: " f"references {child_var!r} inside a control-flow block" ) continue inner = stmt.children[0] if stmt.children else None if inner is None: continue # Skip self.add_child(<child_var>) — parent's responsibility. if _is_self_add_child_for(stmt, child_var): continue # Lift candidate: the statement uses <child_var> as the head # of an attribute access (e.g. `<child_var>.add_child(...)`). if not _stmt_uses_var_as_head(stmt, child_var): continue # Helper-var dependency check: any other local-var name # appearing in this statement (other than child_var) means we # cannot lift cleanly. extras = _foreign_local_refs(stmt, local_var_names, exclude={child_var}) if extras: raise ValueError( f"split_child: cannot lift statement at line " f"{stmt.get_first_leaf().start_pos[0]}: " f"depends on helper variable(s) {sorted(extras)!r} -- " "inline the helper into the construction or restructure manually" ) lift_candidates.append(stmt) return lift_candidates def _remove_lifted_statements(self, sc: SceneClass, lift_blocks: list) -> None: """Remove each statement in ``lift_blocks`` from the parent suite. Uses :func:`simvx.core.scene_io.edits.remove_node` so the surrounding whitespace is collapsed cleanly. """ from . import edits for stmt in lift_blocks: edits.remove_node(stmt) def _read_child_construction(self, sc: SceneClass, child_var: str) -> tuple[str, list[tuple[str, str]]]: """Return (type_name, [(kwarg_name, kwarg_value_text), ...]) for the construction line ``<child_var> = <type_name>(<kwargs>)``.""" from parso.tree import Leaf from .scene_file import _arglist_arguments, _argument_name assignment = sc._find_child_assignment(child_var) if assignment is None: raise ValueError(f"child {child_var!r} not found") expr_stmt = assignment.children[0] rhs = expr_stmt.children[2] if rhs.type != "atom_expr" or len(rhs.children) < 2: raise ValueError(f"child {child_var!r} construction is not a simple call") head = rhs.children[0] if not isinstance(head, Leaf) or head.type != "name": raise ValueError(f"child {child_var!r} construction has no type name") type_name = head.value trailer = rhs.children[-1] kwargs: list[tuple[str, str]] = [] for arg in _arglist_arguments(trailer): name = _argument_name(arg) if name is None: continue value_node = arg.children[2] kwargs.append((name, value_node.get_code().strip())) return type_name, kwargs def _replace_child_construction(self, sc: SceneClass, child_var: str, new_type_name: str) -> None: """Rewrite ``<child_var> = <Type>(<kwargs>)`` to ``<child_var> = <new_type_name>()``.""" from .source_tree import parse_snippet assignment = sc._find_child_assignment(child_var) if assignment is None: raise ValueError(f"child {child_var!r} not found") expr_stmt = assignment.children[0] rhs = expr_stmt.children[2] new_rhs = parse_snippet(f"x = {new_type_name}()\n").children[0].children[2] # Inherit the leading prefix of the old RHS (typically a single space). from parso.tree import Leaf as _Leaf old_prefix = rhs.get_first_leaf().prefix if not isinstance(rhs, _Leaf) else rhs.prefix new_first = new_rhs.get_first_leaf() if not isinstance(new_rhs, _Leaf) else new_rhs new_first.prefix = old_prefix new_rhs.parent = expr_stmt expr_stmt.children[2] = new_rhs def _replace_child_construction_full( self, sc: SceneClass, child_var: str, new_type_name: str, kwargs: list[tuple[str, str]], ) -> None: """Rewrite ``<child_var> = <X>()`` to ``<child_var> = <new_type_name>(<kwargs>)``.""" from parso.tree import Leaf as _Leaf from .source_tree import parse_snippet assignment = sc._find_child_assignment(child_var) if assignment is None: raise ValueError(f"child {child_var!r} not found") expr_stmt = assignment.children[0] rhs = expr_stmt.children[2] kwargs_str = ", ".join(f"{k}={v}" for k, v in kwargs) new_rhs = parse_snippet(f"x = {new_type_name}({kwargs_str})\n").children[0].children[2] old_prefix = rhs.get_first_leaf().prefix if not isinstance(rhs, _Leaf) else rhs.prefix new_first = new_rhs.get_first_leaf() if not isinstance(new_rhs, _Leaf) else new_rhs new_first.prefix = old_prefix new_rhs.parent = expr_stmt expr_stmt.children[2] = new_rhs def _find_child_using_class(self, sc: SceneClass, class_name: str) -> str | None: """Find the child var whose construction calls ``class_name(...)``.""" for var in sc.child_var_names(): assignment = sc._find_child_assignment(var) if assignment is None: continue expr_stmt = assignment.children[0] if len(expr_stmt.children) < 3: continue rhs = expr_stmt.children[2] if rhs.type != "atom_expr" or not rhs.children: continue head = rhs.children[0] if getattr(head, "type", None) == "name" and head.value == class_name: return var return None def _extract_subclass_construction(self, target_class: SceneClass) -> tuple[str | None, list[tuple[str, str]]]: """For a clean leaf class ``class X(<Super>): super().__init__(**kwargs, k=v, ...)``, return ``(<Super>, [(k, v), ...])``. Excludes ``**kwargs`` passthrough. Returns ``(None, [])`` if the class doesn't fit this shape. """ from parso.python.tree import Class from parso.tree import Leaf from .scene_file import _arglist_arguments, _argument_name cls_node: Class = target_class.node # Bases: arglist on the class header. children: ['class', name, '(', arglist|name, ')', ':', suite] # The base name is at children[3] (could be a name leaf or arglist). base_node = None for c in cls_node.children: if isinstance(c, Leaf) and c.type == "name" and c is not cls_node.name: base_node = c break if base_node is None: # Look for a single positional arg (rare path). return None, [] super_type = base_node.value # Recover kwargs from the body of __init__: super().__init__(**kwargs, k=v, ...). trailer = target_class._super_init_trailer() if trailer is None: return super_type, [] kwargs: list[tuple[str, str]] = [] for arg in _arglist_arguments(trailer): name = _argument_name(arg) if name is None: continue # skip **kwargs / *args / positional value_node = arg.children[2] kwargs.append((name, value_node.get_code().strip())) # If the class body has any `add_child(...)` calls, this is not a # leaf — we don't support inlining sub-trees in v1. if target_class.child_var_names(): return None, [] return super_type, kwargs
# --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _file_qualifies(path: Path) -> bool: """True iff ``path`` contains exactly one Node subclass and is readable.""" try: text = path.read_text() except OSError: return False try: return primary_node_class_from_source(text, path=path) is not None except ValueError: # AmbiguousSceneError: multiple Node subclasses → not a clean module. return False def _stem_to_class_name(stem: str) -> str: """Convert ``"my_thing"`` to ``"MyThing"`` for use as a class name.""" parts = [p for p in stem.replace("-", "_").split("_") if p] if not parts: return stem.capitalize() or "Scene" return "".join(p[:1].upper() + p[1:] for p in parts) def _build_subclass_source( class_name: str, super_type: str, kwargs: list[tuple[str, str]], *, lifted_body: str = "", ) -> str: """Emit a minimal subclass file: ``from simvx.core import <super_type>`` + ``class <class_name>(<super_type>): def __init__(self, **kwargs): super().__init__(**kwargs, k=v, ...)``. When ``lifted_body`` is non-empty, its contents (already 8-space indented to sit inside ``__init__``) are appended after the ``super().__init__(...)`` line. Used for nested ``split_child`` to transplant statements like ``self.add_child(Weapon())`` into the new class. """ super_call_args = "**kwargs" if kwargs: super_call_args += ", " + ", ".join(f"{k}={v}" for k, v in kwargs) body = f" super().__init__({super_call_args})\n" if lifted_body: body += lifted_body if lifted_body.endswith("\n") else lifted_body + "\n" return ( f"from simvx.core import {super_type}\n" "\n" "\n" f"class {class_name}({super_type}):\n" " def __init__(self, **kwargs):\n" f"{body}" ) def _lifted_body_from_blocks(lift_blocks: list, child_var: str) -> str: """Render lifted statements as a body suitable for the new class's ``__init__``. Each statement's ``<child_var>.`` head is rewritten to ``self.``; leading/trailing prefix is normalised to a single 8-space indent; blank lines between original statements are collapsed. """ if not lift_blocks: return "" lines: list[str] = [] for stmt in lift_blocks: text = stmt.get_code() # Rewrite head references: leading `<child_var>.` becomes `self.`. # Match cautiously — only at line starts (after indent). rewritten_lines = [] for raw in text.splitlines(): stripped = raw.lstrip(" \t") if stripped.startswith(f"{child_var}."): rewritten_lines.append(f" self.{stripped[len(child_var) + 1 :]}") else: # Preserve original line as-is at the new indent. rewritten_lines.append(f" {stripped}" if stripped else "") # Drop empty trailing lines from a single statement. while rewritten_lines and not rewritten_lines[-1].strip(): rewritten_lines.pop() lines.extend(rewritten_lines) return "\n".join(lines) + "\n" def _is_self_add_child_for(stmt, child_var: str) -> bool: """True iff ``stmt`` is exactly ``self.add_child(<child_var>)``.""" from parso.tree import Leaf inner = stmt.children[0] if stmt.children else None if inner is None or inner.type != "atom_expr": return False children = inner.children if len(children) < 3: return False head = children[0] if not isinstance(head, Leaf) or head.value != "self": return False # Walk: self . add_child ( <child_var> ) text = inner.get_code().strip() if not text.startswith("self.add_child(") or not text.endswith(")"): return False arg = text[len("self.add_child(") : -1].strip() return arg == child_var def _stmt_uses_var_as_head(stmt, var_name: str) -> bool: """True iff ``stmt``'s primary expression begins with ``<var_name>.`` (attribute access) or ``<var_name>(`` (call). Used to decide whether a statement should be lifted with the var. """ from parso.tree import Leaf inner = stmt.children[0] if stmt.children else None if inner is None: return False if inner.type == "atom_expr" and inner.children: head = inner.children[0] if isinstance(head, Leaf) and head.type == "name" and head.value == var_name: return True elif inner.type == "expr_stmt" and inner.children: # `<var>.attr = value` — LHS is atom_expr starting with var. target = inner.children[0] if target.type == "atom_expr" and target.children: head = target.children[0] if isinstance(head, Leaf) and head.type == "name" and head.value == var_name: return True return False def _stmt_references_name(stmt, name: str) -> bool: """Walk ``stmt`` for any leaf with value ``name`` of type 'name'.""" from parso.tree import Leaf if isinstance(stmt, Leaf): return stmt.type == "name" and stmt.value == name for c in getattr(stmt, "children", ()): if _stmt_references_name(c, name): return True return False def _foreign_local_refs(stmt, locals_set: set[str], *, exclude: set[str]) -> set[str]: """Names from ``locals_set`` (minus ``exclude``) referenced inside ``stmt``. Uses syntactic walk: any ``name`` leaf whose value matches a local var counts as a reference. We deliberately don't handle attribute accesses specially — `self.foo` walks `self` as a name leaf, but `self` is never in `locals_set` for a scene's ``__init__``. """ from parso.tree import Leaf candidates = locals_set - exclude found: set[str] = set() def walk(node): if isinstance(node, Leaf): if node.type == "name" and node.value in candidates: found.add(node.value) return for c in getattr(node, "children", ()): walk(c) walk(stmt) return found