Source code for simvx.core.resource_loader

"""
Background resource loading -- threaded loading with progress tracking.

Provides a Godot-style interface for background resource loading with status
polling, suitable for loading screens and smooth level transitions.

Public API::

    from simvx.core.resource_loader import ResourceLoader
    from simvx.core import Resource

    ResourceLoader.load_threaded_request(Resource("game.assets", "ship.png"))
    ResourceLoader.load_threaded_request("textures/ship.png")
    while True:
        status, progress = ResourceLoader.load_threaded_get_status()
        if status == "loaded":
            data = ResourceLoader.load_threaded_get()
            break
        update_loading_bar(progress)

The default loader reads any source (filesystem path, ``Resource``, or
``Traversable``) as raw bytes. Use :meth:`register_loader` to install custom
type-dispatched loaders.
"""

from __future__ import annotations

import logging
import os
import threading
import time
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from enum import StrEnum
from importlib.resources.abc import Traversable
from typing import TYPE_CHECKING, Any, Union

from .asset_resolver import resolve_asset_path

if TYPE_CHECKING:
    from .resource import Resource

log = logging.getLogger(__name__)

__all__ = ["ResourceLoader", "LoadStatus", "AssetSpec"]

AssetSpec = Union[str, os.PathLike, "Resource", Traversable]

[docs] class LoadStatus(StrEnum): """Status of a background load request.""" IDLE = "idle" LOADING = "loading" LOADED = "loaded" ERROR = "error" CANCELLED = "cancelled"
def _request_key(spec: AssetSpec) -> str: """Stable string key used to deduplicate load requests.""" from .resource import Resource if isinstance(spec, Resource): return f"resource:{spec.package}:{spec.name}" if isinstance(spec, Traversable) and not isinstance(spec, (str, os.PathLike)): return f"traversable:{spec!s}" if isinstance(spec, (str, os.PathLike)): return f"path:{os.fspath(spec)}" raise TypeError( f"ResourceLoader expects str | os.PathLike | Resource | Traversable, got {type(spec).__name__}" ) class _LoadRequest: """Internal tracking for a single load request.""" __slots__ = ( "spec", "future", "status", "progress", "result", "error", "loader", "deadline", "cancelled", ) def __init__(self, spec: AssetSpec, loader: Callable[[AssetSpec], Any]): self.spec = spec self.loader = loader self.status = LoadStatus.LOADING self.progress: float = 0.0 self.result: Any = None self.error: Exception | None = None self.future: Future | None = None self.deadline: float | None = None self.cancelled: bool = False
[docs] class ResourceLoader: """Singleton threaded resource loader with progress tracking. Pool size and a default per-load timeout can be configured via :meth:`configure` before first use. Call :meth:`reset` first if the singleton has already been created. """ _instance: ResourceLoader | None = None _lock = threading.Lock() _default_max_workers: int = 2 _default_timeout: float | None = None def __init__(self): self._pool = ThreadPoolExecutor( max_workers=self._default_max_workers, thread_name_prefix="resource_loader", ) self._requests: dict[str, _LoadRequest] = {} # Type-dispatched custom loaders. Keys are types; values are callables. self._type_loaders: dict[type, Callable[[Any], Any]] = {} self._default_timeout_override: float | None = self._default_timeout
[docs] @classmethod def configure(cls, *, max_workers: int = 2, default_timeout: float | None = None) -> None: """Set pool size and default per-load timeout. Must be called before the singleton is first accessed. Call :meth:`reset` first to reconfigure an existing instance. """ if max_workers < 1: raise ValueError(f"max_workers must be >= 1, got {max_workers}") if default_timeout is not None and default_timeout <= 0: raise ValueError(f"default_timeout must be > 0, got {default_timeout}") with cls._lock: if cls._instance is not None: raise RuntimeError( "ResourceLoader.configure() must be called before first use. " "Call ResourceLoader.reset() first to reconfigure." ) cls._default_max_workers = max_workers cls._default_timeout = default_timeout
[docs] @classmethod def get(cls) -> ResourceLoader: """Return the singleton instance.""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = cls() return cls._instance
[docs] @classmethod def reset(cls): """Reset the singleton (for tests). Shuts down the thread pool.""" if cls._instance is not None: cls._instance._pool.shutdown(wait=False) cls._instance = None
[docs] def register_loader(self, source_type: type, loader: Callable[[Any], Any]) -> None: """Register a custom loader keyed by *source_type*. ``isinstance(spec, source_type)`` selects the loader. Tested in registration order, with the default file-bytes loader as fallback. """ if not isinstance(source_type, type): raise TypeError(f"register_loader source_type must be a type, got {type(source_type).__name__}") self._type_loaders[source_type] = loader
def _select_loader(self, spec: AssetSpec) -> Callable[[Any], Any]: for source_type, loader in self._type_loaders.items(): if isinstance(spec, source_type): return loader return self._load_bytes
[docs] @classmethod def load_threaded_request( cls, spec: AssetSpec, loader: Callable[[Any], Any] | None = None, *, timeout: float | None = None, ) -> None: """Start loading a resource in the background. Args: spec: A filesystem path string / :class:`~os.PathLike`, a :class:`~simvx.core.Resource`, or an :class:`importlib.resources.abc.Traversable`. loader: Optional custom loader. Receives *spec* directly. If ``None``, a registered type-loader is selected, falling back to a bytes reader. timeout: Optional per-request timeout in seconds. Overrides the configured default. """ key = _request_key(spec) inst = cls.get() if (existing := inst._requests.get(key)) is not None and existing.status == LoadStatus.LOADING: return # already loading — dedup if loader is None: loader = inst._select_loader(spec) req = _LoadRequest(spec, loader) effective_timeout = timeout if timeout is not None else inst._default_timeout_override if effective_timeout is not None: req.deadline = time.monotonic() + effective_timeout inst._requests[key] = req def _do_load(): try: result = req.loader(req.spec) if req.cancelled: return # discard result, status handled by cancel() req.result = result req.progress = 1.0 req.status = LoadStatus.LOADED except Exception as e: if req.cancelled: return req.error = e req.status = LoadStatus.ERROR log.error("resource_loader: failed to load %r: %s", req.spec, e) req.future = inst._pool.submit(_do_load)
[docs] @classmethod def cancel(cls, spec: AssetSpec) -> bool: """Cancel a pending load request. See class docstring for semantics.""" key = _request_key(spec) inst = cls.get() req = inst._requests.get(key) if req is None: return False req.cancelled = True cancelled = False if req.future is not None: cancelled = req.future.cancel() req.status = LoadStatus.CANCELLED if cancelled: inst._requests.pop(key, None) return cancelled
[docs] @classmethod def load_threaded_get_status(cls, spec: AssetSpec | None = None) -> tuple[str, float]: """Check the status of a background load. If *spec* is ``None`` returns the most recent request's status. """ inst = cls.get() if spec is None: if not inst._requests: return ("idle", 0.0) req = next(reversed(inst._requests.values())) else: key = _request_key(spec) if (req := inst._requests.get(key)) is None: return ("idle", 0.0) # Enforce deadline lazily. if ( req.status == LoadStatus.LOADING and req.deadline is not None and time.monotonic() >= req.deadline ): req.error = TimeoutError(f"Load of {req.spec!r} exceeded deadline") req.status = LoadStatus.ERROR if req.future is not None: req.future.cancel() return (req.status.value, req.progress)
[docs] @classmethod def load_threaded_get(cls, spec: AssetSpec | None = None, *, timeout: float | None = None) -> Any: """Get the loaded resource (blocks if still loading). If *spec* is ``None`` returns the result of the most recent request. """ inst = cls.get() if spec is None: if not inst._requests: raise RuntimeError("No pending load requests") key = next(reversed(inst._requests)) else: key = _request_key(spec) if (req := inst._requests.get(key)) is None: raise RuntimeError(f"No load request for: {spec!r}") # Work out how long to wait: explicit timeout > submission deadline > None. wait: float | None = timeout if wait is None and req.deadline is not None: wait = max(0.0, req.deadline - time.monotonic()) if req.future is not None: try: req.future.result(timeout=wait) except FutureTimeoutError as exc: req.error = TimeoutError(f"Load of {spec!r} timed out after {wait}s") req.status = LoadStatus.ERROR raise req.error from exc if req.status == LoadStatus.ERROR: raise RuntimeError(f"Failed to load {spec!r}: {req.error}") result = req.result # Clean up completed request del inst._requests[key] return result
[docs] @classmethod def is_loading(cls) -> bool: """Check if any resources are currently loading.""" inst = cls.get() return any(r.status == LoadStatus.LOADING for r in inst._requests.values())
[docs] @classmethod def get_progress(cls) -> float: """Get overall loading progress across all pending requests (0.0 to 1.0).""" inst = cls.get() if not inst._requests: return 1.0 total = sum(r.progress for r in inst._requests.values()) return total / len(inst._requests)
# --- Built-in loader --- @staticmethod def _load_bytes(spec: AssetSpec) -> bytes: """Default loader: resolve *spec* to a path and read raw bytes.""" return resolve_asset_path(spec).read_bytes()