"""Pickled game-save persistence for nodes with ``persist=True`` Properties.
``SaveManager`` walks a live scene tree, snapshots every ``Property`` flagged
``persist=True``, and round-trips the data through pickle. Saves are atomic
(write-temp + ``fsync`` + rename) and rotate two backups on each write
(``<slot>.sav.bak``, ``<slot>.sav.bak2``).
Per-class versioning is supported via ``__save_version__`` (defaults to ``1``).
When the stored version is older than the class version, ``apply`` invokes
``cls.__migrate_save__(values, from_v, to_v)`` if defined; otherwise it
strict-raises. Newer-than-supported versions always strict-raise.
Example::
from simvx.core import SaveManager, Node, Property
class Player(Node):
__save_version__ = 1
hp = Property(100, persist=True, save_version=1)
mgr = SaveManager() # saves under <cwd>/saves
path = mgr.save(root, "slot1") # → <cwd>/saves/slot1.sav
data = mgr.load("slot1")
mgr.apply(root, data)
"""
from __future__ import annotations
import importlib
import logging
import os
import pickle
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from . import __version__ as _engine_version
from .node import Node
log = logging.getLogger(__name__)
#: Format version of the save container itself. Bump when the *envelope*
#: changes (top-level keys, node-entry shape). Per-class data is versioned
#: independently via ``__save_version__``.
SAVE_FORMAT_VERSION = 1
# ---------------------------------------------------------------------------
# Atomic pickle write + two-deep backup rotation.
#
# Shared by ``SaveManager`` (game saves, ``persist=True`` Properties) and the
# editor's autosave/crash-recovery snapshots, which need the same crash-safety
# guarantee on a different payload (full ``_serialize_node`` tree dumps).
# Keeping one implementation avoids the rotation logic drifting between
# consumers.
# ---------------------------------------------------------------------------
[docs]
def pickle_atomic(path: Path, obj: Any) -> Path:
"""Pickle ``obj`` to ``path`` atomically, rotating two backups.
Writes ``<path>.tmp``, fsyncs, then renames into place. Before the rename
lands, the previous ``<path>`` is rotated to ``<path>.bak`` and any
pre-existing ``<path>.bak`` becomes ``<path>.bak2``. At every instant
either the old or the new file is fully on disk -- the temp file is the
only state that can be partial, and it never replaces the live file until
fsync has succeeded.
Raises ``OSError`` on write failure (the temp file is best-effort cleaned).
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_name(path.name + ".tmp")
payload = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
try:
with open(tmp_path, "wb") as fh:
fh.write(payload)
fh.flush()
os.fsync(fh.fileno())
except OSError:
try:
tmp_path.unlink()
except OSError:
pass
raise
bak_path = path.with_name(path.name + ".bak")
bak2_path = path.with_name(path.name + ".bak2")
if path.exists():
if bak_path.exists():
os.replace(bak_path, bak2_path)
os.replace(path, bak_path)
os.replace(tmp_path, path)
return path
# ---------------------------------------------------------------------------
# Class encoding — stable ``"module.QualName"`` strings for the ``class`` field.
# Independent of scene.py's ``_encode_type`` (which encodes Vec/Quat values,
# not classes). Pickle natively preserves Vec/Vec3/Quat so no value codec is
# needed here.
# ---------------------------------------------------------------------------
def _encode_class(cls: type) -> str:
return f"{cls.__module__}.{cls.__qualname__}"
def _decode_class(spec: str) -> type:
module_name, _, qualname = spec.rpartition(".")
if not module_name:
raise ValueError(f"Invalid class spec {spec!r}: expected '<module>.<qualname>'")
try:
module = importlib.import_module(module_name)
except ImportError as exc:
raise ImportError(f"Cannot import module {module_name!r} for class {spec!r}: {exc}") from exc
obj: Any = module
for part in qualname.split("."):
try:
obj = getattr(obj, part)
except AttributeError as exc:
raise AttributeError(f"Cannot resolve class {spec!r}: missing attribute {part!r}") from exc
if not isinstance(obj, type):
raise TypeError(f"Resolved {spec!r} but it is not a class (got {type(obj).__name__})")
return obj
def _node_by_path(root: Node, path: str) -> Node | None:
"""Resolve a stable path emitted by ``Node.walk`` back to a live node.
``Node.path`` returns ``"/Root/Child/Grandchild"``. ``Node.get_node`` does
not accept its own root (``"/Root"``) when called *on* that root, so we
match the leading segment and recurse from there.
"""
if not path.startswith("/"):
return None
parts = [p for p in path.split("/") if p]
if not parts or parts[0] != root.name:
return None
if len(parts) == 1:
return root
try:
return root.get_node("/".join(parts[1:]))
except (KeyError, ValueError):
return None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
class SaveManager:
"""Save / load / apply persisted Property snapshots for a node tree.
Attributes:
save_dir: Directory where ``<slot>.sav`` files live. Created on first
save. Defaults to ``<cwd>/saves`` when not provided.
"""
__slots__ = ("save_dir",)
def __init__(self, save_dir: Path | str | None = None) -> None:
self.save_dir = Path(save_dir) if save_dir is not None else Path.cwd() / "saves"
# -- save -------------------------------------------------------------
[docs]
def save(self, root: Node, slot: str) -> Path:
"""Snapshot the tree and write ``<save_dir>/<slot>.sav`` atomically.
Returns the final path. Raises ``OSError`` on write failure (after
rotating nothing — partial state is never observable).
"""
if not slot:
raise ValueError("slot must be a non-empty string")
nodes_payload: list[dict[str, Any]] = []
for node in root.walk():
values: dict[str, Any] = {}
for name, prop in type(node).get_properties().items():
if not prop.persist:
continue
values[name] = getattr(node, name)
if not values:
continue
cls = type(node)
nodes_payload.append({
"path": node.path,
"class": _encode_class(cls),
"save_version": int(getattr(cls, "__save_version__", 1)),
"values": values,
})
envelope = {
"format_version": SAVE_FORMAT_VERSION,
"engine_version": _engine_version,
"saved_at": datetime.now(UTC).isoformat(),
"nodes": nodes_payload,
}
return pickle_atomic(self.save_dir / f"{slot}.sav", envelope)
# -- load -------------------------------------------------------------
[docs]
def load(self, slot: str) -> dict:
"""Read ``<save_dir>/<slot>.sav`` and return the decoded envelope.
No fallback to ``.bak`` — backups are for manual recovery only.
"""
if not slot:
raise ValueError("slot must be a non-empty string")
path = self.save_dir / f"{slot}.sav"
if not path.exists():
raise FileNotFoundError(f"No save file at {path}")
data = pickle.loads(path.read_bytes())
if not isinstance(data, dict):
raise RuntimeError(f"Save file {path} is corrupt: top-level is {type(data).__name__}, expected dict")
fmt = data.get("format_version")
if fmt is None:
raise RuntimeError(f"Save file {path} missing 'format_version' key")
if fmt > SAVE_FORMAT_VERSION:
raise RuntimeError(
f"Save file {path} has format_version={fmt}, "
f"this build only supports up to {SAVE_FORMAT_VERSION}"
)
return data
# -- apply ------------------------------------------------------------
[docs]
def apply(self, root: Node, data: dict) -> None:
"""Restore values from a loaded envelope onto a live tree.
Strict-raises if a stored node path is missing, the class fails to
resolve, or per-class versions are incompatible without a migration
hook.
"""
nodes = data.get("nodes")
if not isinstance(nodes, list):
raise RuntimeError("Save data missing 'nodes' list")
for entry in nodes:
path = entry["path"]
cls_spec = entry["class"]
stored_version = int(entry.get("save_version", 1))
values = dict(entry["values"])
target = _node_by_path(root, path)
if target is None:
raise RuntimeError(f"Cannot apply save: no node at path {path!r} (class {cls_spec!r})")
cls = _decode_class(cls_spec)
current_version = int(getattr(cls, "__save_version__", 1))
if stored_version > current_version:
raise RuntimeError(
f"Save entry for {path!r} has save_version={stored_version} "
f"but {cls_spec!r} only supports up to {current_version}"
)
if stored_version < current_version:
migrate = getattr(cls, "__migrate_save__", None)
if migrate is None:
raise RuntimeError(
f"Save entry for {path!r} ({cls_spec!r}) is at version "
f"{stored_version}, current is {current_version}, and no "
"__migrate_save__ classmethod is defined"
)
values = migrate(values, stored_version, current_version)
if not isinstance(values, dict):
raise RuntimeError(
f"{cls_spec}.__migrate_save__ must return a dict, got {type(values).__name__}"
)
props = type(target).get_properties()
for name, value in values.items():
prop = props.get(name)
if prop is None:
raise RuntimeError(
f"Save entry for {path!r} carries unknown Property {name!r} "
f"(class {cls_spec!r} has no such descriptor)"
)
# Route through the descriptor so validation + on_change fire.
prop.__set__(target, value)