"""Discovery + cache for user-defined Node subclasses in the active project.
Walks the project ``src/`` directory (configurable via
``simvx.toml [editor].class_files_dir``) and parses each ``.py`` file with
:func:`simvx.core.scene_io.parse_source` to find top-level ``class X(...)``
definitions whose base list references a known ``Node`` subclass name.
The result feeds the Add Node dialog's class picker so users can instantiate
their own classes alongside engine built-ins. We deliberately *do not* import
the project files -- the dialog only needs the class name, module path, and
file location for display + lazy resolution at instantiation time.
Cache invalidation is mtime-based: each call to :meth:`ProjectClassIndex.refresh`
re-stats every ``.py`` file under the configured root and re-parses any whose
``mtime`` changed since the last scan. Files with syntax errors are skipped
silently (the parser still returns a tree thanks to error recovery, but we
treat any reported error as "skip and try again next refresh").
"""
from __future__ import annotations
import importlib
import logging
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from simvx.core import Node
from simvx.core.scene_io import (
UseSiteRef,
find_class_uses,
parse_source,
rename_class_in_source,
rename_module_in_imports,
)
log = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True)
class ProjectClass:
"""A user-defined Node subclass discovered by walking project source files.
``module_path`` is the dotted module name relative to the configured root
(e.g. ``player.attack``). ``file_path`` is the absolute filesystem path,
kept so the editor can open the source on demand.
"""
name: str
module_path: str
file_path: Path
bases: tuple[str, ...]
[docs]
@property
def display_subtitle(self) -> str:
"""Secondary label shown under the class name in the picker."""
return self.module_path or self.file_path.stem
def _builtin_node_subclass_names() -> set[str]:
"""Names of every loaded ``Node`` subclass, including ``Node`` itself.
Walked recursively via ``__subclasses__()`` so any Node-derived class
imported anywhere in the running editor is considered a valid base.
"""
seen: set[type] = {Node}
stack: list[type] = [Node]
while stack:
cls = stack.pop()
for sub in cls.__subclasses__():
if sub not in seen:
seen.add(sub)
stack.append(sub)
return {cls.__name__ for cls in seen}
def _module_path_from(file_path: Path, root: Path) -> str:
"""Convert ``root/foo/bar.py`` to ``foo.bar``; ``root/foo/__init__.py`` to ``foo``."""
rel = file_path.relative_to(root).with_suffix("")
parts = list(rel.parts)
if parts and parts[-1] == "__init__":
parts.pop()
return ".".join(parts)
def _extract_base_names(classdef) -> tuple[str, ...]:
"""Return the bare names referenced in a parso ``classdef``'s base list.
Handles ``class X(A)``, ``class X(A, B)``, and dotted bases like
``class X(simvx.core.Node)`` (we keep the trailing identifier only --
that's enough to match against ``Node.__subclasses__()`` names).
"""
bases: list[str] = []
# parso: classdef = ['class', NAME, '(', arglist|name, ')', ':', suite]
children = classdef.children
open_paren = next((i for i, c in enumerate(children) if getattr(c, "value", None) == "("), -1)
if open_paren < 0:
return ()
close_paren = next(
(i for i, c in enumerate(children[open_paren:], open_paren) if getattr(c, "value", None) == ")"),
-1,
)
if close_paren < 0:
return ()
inner = children[open_paren + 1 : close_paren]
# Multiple bases come wrapped in 'arglist'; single bare name is just the leaf.
arglist = None
for node in inner:
if getattr(node, "type", None) == "arglist":
arglist = node
break
parts = arglist.children if arglist is not None else inner
for node in parts:
if getattr(node, "value", None) == ",":
continue
bases.append(_trailing_name(node))
return tuple(b for b in bases if b)
def _trailing_name(node) -> str:
"""Return the last identifier in a possibly-dotted base reference."""
# Bare name leaf
if hasattr(node, "value") and not getattr(node, "children", None):
return node.value
# Dotted: look for the last NAME-typed leaf
if hasattr(node, "children"):
last = ""
for leaf in node.children:
name = _trailing_name(leaf)
if name and name != ".":
last = name
return last
return ""
[docs]
class ProjectClassIndex:
"""Mtime-cached index of user-defined Node subclasses under a project root.
Construct once per project session; call :meth:`refresh` whenever the
picker is about to be shown. ``refresh`` is cheap when nothing changed
(one ``stat`` per ``.py`` file plus a dict lookup); a full re-scan only
happens for files whose mtime advanced.
"""
def __init__(self, project_path: Path | None = None, *, src_subdir: str = "src") -> None:
self._project_path: Path | None = Path(project_path) if project_path else None
self._src_subdir = src_subdir
# file_path -> (mtime_ns, [ProjectClass, ...])
self._cache: dict[Path, tuple[int, list[ProjectClass]]] = {}
[docs]
@property
def project_path(self) -> Path | None:
return self._project_path
[docs]
def set_project_path(self, project_path: Path | None) -> None:
"""Switch the index to a new project; clears the cache."""
new = Path(project_path) if project_path else None
if new != self._project_path:
self._project_path = new
self._cache.clear()
[docs]
def set_src_subdir(self, subdir: str) -> None:
"""Configure the scanned subdirectory (default ``src``)."""
if subdir != self._src_subdir:
self._src_subdir = subdir
self._cache.clear()
[docs]
@property
def root(self) -> Path | None:
"""Absolute path of the directory walked for ``.py`` files."""
if self._project_path is None:
return None
candidate = self._project_path / self._src_subdir
return candidate if candidate.exists() else None
[docs]
def refresh(self) -> list[ProjectClass]:
"""Rescan the project root, re-parsing only files with changed mtime.
Returns the full list of discovered classes (sorted by name).
"""
root = self.root
if root is None:
self._cache.clear()
return []
builtin_names = _builtin_node_subclass_names()
live_files: set[Path] = set()
for file_path in root.rglob("*.py"):
if not file_path.is_file():
continue
live_files.add(file_path)
try:
mtime_ns = file_path.stat().st_mtime_ns
except OSError:
continue
cached = self._cache.get(file_path)
if cached is not None and cached[0] == mtime_ns:
continue
classes = self._scan_file(file_path, root, builtin_names)
self._cache[file_path] = (mtime_ns, classes)
# Drop entries for files that disappeared since the last scan.
for stale in [p for p in self._cache if p not in live_files]:
self._cache.pop(stale, None)
# Second pass: resolve transitive inheritance across project files
# (a user class that extends another user class only the first pass
# missed). We do this by re-scanning every file with the union of
# built-in names and previously-discovered project class names; the
# mtime cache still gates expensive parses on subsequent refreshes.
project_names = {c.name for _, classes in self._cache.values() for c in classes}
if project_names - builtin_names:
extended = builtin_names | project_names
for file_path in list(self._cache.keys()):
cached_mtime, cached_classes = self._cache[file_path]
rescanned = self._scan_file(file_path, root, extended)
if len(rescanned) != len(cached_classes):
self._cache[file_path] = (cached_mtime, rescanned)
return sorted(
(c for _, classes in self._cache.values() for c in classes),
key=lambda c: (c.module_path, c.name),
)
[docs]
def all(self) -> list[ProjectClass]:
"""Return the cached list without rescanning."""
return sorted(
(c for _, classes in self._cache.values() for c in classes),
key=lambda c: (c.module_path, c.name),
)
[docs]
def iter_use_sites(self, class_name: str) -> Iterator[tuple[Path, UseSiteRef]]:
"""Yield ``(file_path, use_site)`` pairs for every reference to ``class_name``.
Walks every project file (re-scanning the cache first) and runs
:func:`simvx.core.scene_io.find_class_uses` against each parsed
source. Files with parse errors are skipped silently — the next
:meth:`refresh` will re-attempt them.
"""
root = self.root
if root is None:
return
for file_path in root.rglob("*.py"):
if not file_path.is_file():
continue
try:
source = file_path.read_text(encoding="utf-8")
except OSError:
continue
try:
tree = parse_source(source)
except Exception:
continue
if tree.errors:
continue
for use in find_class_uses(tree, class_name):
yield file_path, use
[docs]
def resolve(self, project_class: ProjectClass) -> type[Node] | None:
"""Import the project module and return the class object, or None.
The picker stores ``ProjectClass`` records (name + module path) so the
editor can display the list without executing user code. When the user
actually picks an entry, the editor calls this to import the module
and resolve the live class. Failures are logged and return ``None`` --
callers should surface a user-visible error in that case.
"""
if not project_class.module_path:
return None
try:
module = importlib.import_module(project_class.module_path)
except Exception:
log.exception("project_classes: failed to import %s", project_class.module_path)
return None
cls = getattr(module, project_class.name, None)
if cls is None or not isinstance(cls, type) or not issubclass(cls, Node):
return None
return cls
# -- internals --
def _scan_file(self, file_path: Path, root: Path, builtin_names: set[str]) -> list[ProjectClass]:
"""Parse one file and return the Node subclasses defined at top level."""
try:
source = file_path.read_text(encoding="utf-8")
except OSError:
return []
try:
tree = parse_source(source)
except Exception:
log.debug("project_classes: parse failed for %s", file_path, exc_info=True)
return []
if tree.errors:
# File has syntax errors -- skip silently; next refresh retries.
return []
module_path = _module_path_from(file_path, root)
results: list[ProjectClass] = []
for classdef in tree.iter_classes():
bases = _extract_base_names(classdef)
if not any(b in builtin_names for b in bases):
# Not a Node subclass (or its base hasn't been imported yet).
# We could chase forward references between project files in a
# second pass, but the common case -- inheriting from an
# engine type -- is handled here, and chained user inheritance
# is resolved on the next refresh once each parent is seen.
continue
results.append(
ProjectClass(
name=classdef.name.value,
module_path=module_path,
file_path=file_path,
bases=bases,
)
)
return results
[docs]
@dataclass(frozen=True)
class RenameResult:
"""Outcome of a project-wide :func:`rename_class` operation."""
files_modified: tuple[Path, ...]
file_renamed: tuple[Path, Path] | None # (old_path, new_path) when filename changed
[docs]
def rename_class(
project_index: ProjectClassIndex,
old_name: str,
new_name: str,
*,
rename_file: bool = False,
) -> RenameResult:
"""Rename a class everywhere it appears in the project.
Updates the class definition + every importer + every instantiation
site by parsing each affected file, running
:func:`simvx.core.scene_io.rename_class_in_source` against it, and
writing the result back. Two-phase: collect every new-source mapping
in memory first; only write to disk after every file successfully
rewrites. If a write fails partway, restore each already-written
file from its in-memory snapshot.
When ``rename_file=True`` and the definition lives in a file whose
stem matches the old class's snake_case form (or in a folder whose
name matches), also rename the file/folder to match the new class.
Importers in other files are updated by virtue of the same
cross-file rewrite.
Returns the set of files modified and (optionally) the file rename.
Raises :class:`ValueError` if ``old_name`` has no definition in the
project, or if ``new_name`` already names another project class.
"""
if old_name == new_name:
return RenameResult(files_modified=(), file_renamed=None)
project_index.refresh()
all_classes = project_index.all()
old_match = next((c for c in all_classes if c.name == old_name), None)
if old_match is None:
raise ValueError(f"rename_class: no class {old_name!r} found in project")
if any(c.name == new_name for c in all_classes):
raise ValueError(f"rename_class: target name {new_name!r} already in use")
# Collect every file touched by this rename: definition file + every
# use-site host. Deduplicate via a dict keyed by absolute path.
affected: dict[Path, str] = {}
affected[old_match.file_path] = old_match.file_path.read_text(encoding="utf-8")
for file_path, _use in project_index.iter_use_sites(old_name):
if file_path not in affected:
affected[file_path] = file_path.read_text(encoding="utf-8")
# If the user opted into file rename, compute the old/new module
# paths so we can also rewrite any `from <old_module> import …` lines
# in affected files.
module_rename: tuple[str, str] | None = None
new_file_path: Path | None = None
if rename_file:
new_file_path = _rename_definition_file(old_match.file_path, old_name, new_name)
if new_file_path is not None and new_file_path != old_match.file_path:
old_module = old_match.module_path
new_module = _module_path_for_renamed_file(old_match, new_name)
if old_module and new_module and old_module != new_module:
module_rename = (old_module, new_module)
# Phase 1: compute every new source in memory.
new_sources: dict[Path, str] = {}
for path, original in affected.items():
tree = parse_source(original)
rename_class_in_source(tree, old_name, new_name)
if module_rename is not None:
rename_module_in_imports(tree, module_rename[0], module_rename[1])
new_sources[path] = tree.dump()
# Phase 2: write each new source. Track what was written so we can
# roll back if a later write fails.
written: list[Path] = []
try:
for path, new_source in new_sources.items():
if new_source == affected[path]:
continue # no-op for this file
path.write_text(new_source, encoding="utf-8")
written.append(path)
except OSError:
# Roll back: restore originals on every file we successfully wrote.
for path in written:
try:
path.write_text(affected[path], encoding="utf-8")
except OSError:
log.exception("project_classes.rename_class: rollback failed for %s", path)
raise
file_renamed: tuple[Path, Path] | None = None
if rename_file and new_file_path is not None and new_file_path != old_match.file_path:
# Folder rename: ensure the parent dir exists if we're moving into a
# new folder (e.g. `<folder>/__init__.py` rename moves the folder).
new_file_path.parent.mkdir(parents=True, exist_ok=True)
old_match.file_path.rename(new_file_path)
file_renamed = (old_match.file_path, new_file_path)
# Folder-as-scene rename also empties the old folder.
if old_match.file_path.name == "__init__.py":
try:
old_match.file_path.parent.rmdir()
except OSError:
pass
# Cache invalidates on next refresh; clear now so callers see
# the new state immediately.
project_index._cache.pop(old_match.file_path, None)
project_index.refresh()
return RenameResult(
files_modified=tuple(written),
file_renamed=file_renamed,
)
def _module_path_for_renamed_file(old_class: ProjectClass, new_class_name: str) -> str | None:
"""Compute the new dotted module path when a class's defining file is renamed.
Mirrors :func:`_rename_definition_file` for module-name calculation:
if the file stem matches the snake_case of the old class, the module's
leaf is replaced with the snake_case of the new class. Returns ``None``
when the file's stem doesn't match the convention (caller skips the
module-rename).
"""
expected_stem = _snake_case(old_class.name)
if old_class.file_path.stem == expected_stem:
new_stem = _snake_case(new_class_name)
parts = old_class.module_path.split(".") if old_class.module_path else []
if not parts:
return new_stem
parts[-1] = new_stem
return ".".join(parts)
if old_class.file_path.name == "__init__.py" and old_class.file_path.parent.name == expected_stem:
new_stem = _snake_case(new_class_name)
parts = old_class.module_path.split(".") if old_class.module_path else []
if not parts:
return new_stem
parts[-1] = new_stem
return ".".join(parts)
return None
def _rename_definition_file(file_path: Path, old_class: str, new_class: str) -> Path | None:
"""Compute the new file path when renaming a class's defining file.
Returns the new path if the file's stem matches the old class's
snake_case form (e.g. ``player.py`` for class ``Player``); otherwise
``None`` (caller leaves the file alone).
"""
expected_stem = _snake_case(old_class)
if file_path.stem == expected_stem:
new_stem = _snake_case(new_class)
return file_path.with_name(f"{new_stem}{file_path.suffix}")
# Folder-as-scene: if the file is `<folder>/__init__.py` and the
# parent folder name matches the snake_case of the old class, rename
# the parent folder.
if file_path.name == "__init__.py" and file_path.parent.name == expected_stem:
new_folder = file_path.parent.with_name(_snake_case(new_class))
return new_folder / "__init__.py"
return None
def _snake_case(name: str) -> str:
"""Convert ``CamelCase`` → ``camel_case``. Mirrors the editor's
convention for filenames generated by Make Custom Class."""
out: list[str] = []
for i, ch in enumerate(name):
if ch.isupper() and i > 0 and not name[i - 1].isupper():
out.append("_")
out.append(ch.lower())
return "".join(out)