"""
Background resource loading -- threaded loading with progress tracking.
Provides a Godot-style interface for background resource loading with status
polling, suitable for loading screens and smooth level transitions.
Public API::
from simvx.core.resource_loader import ResourceLoader
from simvx.core import Resource
ResourceLoader.load_threaded_request(Resource("game.assets", "ship.png"))
ResourceLoader.load_threaded_request("textures/ship.png")
while True:
status, progress = ResourceLoader.load_threaded_get_status()
if status == "loaded":
data = ResourceLoader.load_threaded_get()
break
update_loading_bar(progress)
The default loader reads any source (filesystem path, ``Resource``, or
``Traversable``) as raw bytes. Use :meth:`register_loader` to install custom
type-dispatched loaders.
"""
from __future__ import annotations
import logging
import os
import threading
import time
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from enum import StrEnum
from importlib.resources.abc import Traversable
from typing import TYPE_CHECKING, Any, Union
from .asset_resolver import resolve_asset_path
if TYPE_CHECKING:
from .resource import Resource
log = logging.getLogger(__name__)
__all__ = ["ResourceLoader", "LoadStatus", "AssetSpec"]
AssetSpec = Union[str, os.PathLike, "Resource", Traversable]
[docs]
class LoadStatus(StrEnum):
"""Status of a background load request."""
IDLE = "idle"
LOADING = "loading"
LOADED = "loaded"
ERROR = "error"
CANCELLED = "cancelled"
def _request_key(spec: AssetSpec) -> str:
"""Stable string key used to deduplicate load requests."""
from .resource import Resource
if isinstance(spec, Resource):
return f"resource:{spec.package}:{spec.name}"
if isinstance(spec, Traversable) and not isinstance(spec, (str, os.PathLike)):
return f"traversable:{spec!s}"
if isinstance(spec, (str, os.PathLike)):
return f"path:{os.fspath(spec)}"
raise TypeError(
f"ResourceLoader expects str | os.PathLike | Resource | Traversable, got {type(spec).__name__}"
)
class _LoadRequest:
"""Internal tracking for a single load request."""
__slots__ = (
"spec",
"future",
"status",
"progress",
"result",
"error",
"loader",
"deadline",
"cancelled",
)
def __init__(self, spec: AssetSpec, loader: Callable[[AssetSpec], Any]):
self.spec = spec
self.loader = loader
self.status = LoadStatus.LOADING
self.progress: float = 0.0
self.result: Any = None
self.error: Exception | None = None
self.future: Future | None = None
self.deadline: float | None = None
self.cancelled: bool = False
[docs]
class ResourceLoader:
"""Singleton threaded resource loader with progress tracking.
Pool size and a default per-load timeout can be configured via
:meth:`configure` before first use. Call :meth:`reset` first if the
singleton has already been created.
"""
_instance: ResourceLoader | None = None
_lock = threading.Lock()
_default_max_workers: int = 2
_default_timeout: float | None = None
def __init__(self):
self._pool = ThreadPoolExecutor(
max_workers=self._default_max_workers,
thread_name_prefix="resource_loader",
)
self._requests: dict[str, _LoadRequest] = {}
# Type-dispatched custom loaders. Keys are types; values are callables.
self._type_loaders: dict[type, Callable[[Any], Any]] = {}
self._default_timeout_override: float | None = self._default_timeout
[docs]
@classmethod
def get(cls) -> ResourceLoader:
"""Return the singleton instance."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs]
@classmethod
def reset(cls):
"""Reset the singleton (for tests). Shuts down the thread pool."""
if cls._instance is not None:
cls._instance._pool.shutdown(wait=False)
cls._instance = None
[docs]
def register_loader(self, source_type: type, loader: Callable[[Any], Any]) -> None:
"""Register a custom loader keyed by *source_type*.
``isinstance(spec, source_type)`` selects the loader. Tested in
registration order, with the default file-bytes loader as fallback.
"""
if not isinstance(source_type, type):
raise TypeError(f"register_loader source_type must be a type, got {type(source_type).__name__}")
self._type_loaders[source_type] = loader
def _select_loader(self, spec: AssetSpec) -> Callable[[Any], Any]:
for source_type, loader in self._type_loaders.items():
if isinstance(spec, source_type):
return loader
return self._load_bytes
[docs]
@classmethod
def load_threaded_request(
cls,
spec: AssetSpec,
loader: Callable[[Any], Any] | None = None,
*,
timeout: float | None = None,
) -> None:
"""Start loading a resource in the background.
Args:
spec: A filesystem path string / :class:`~os.PathLike`, a
:class:`~simvx.core.Resource`, or an
:class:`importlib.resources.abc.Traversable`.
loader: Optional custom loader. Receives *spec* directly. If
``None``, a registered type-loader is selected, falling back
to a bytes reader.
timeout: Optional per-request timeout in seconds. Overrides the
configured default.
"""
key = _request_key(spec)
inst = cls.get()
if (existing := inst._requests.get(key)) is not None and existing.status == LoadStatus.LOADING:
return # already loading — dedup
if loader is None:
loader = inst._select_loader(spec)
req = _LoadRequest(spec, loader)
effective_timeout = timeout if timeout is not None else inst._default_timeout_override
if effective_timeout is not None:
req.deadline = time.monotonic() + effective_timeout
inst._requests[key] = req
def _do_load():
try:
result = req.loader(req.spec)
if req.cancelled:
return # discard result, status handled by cancel()
req.result = result
req.progress = 1.0
req.status = LoadStatus.LOADED
except Exception as e:
if req.cancelled:
return
req.error = e
req.status = LoadStatus.ERROR
log.error("resource_loader: failed to load %r: %s", req.spec, e)
req.future = inst._pool.submit(_do_load)
[docs]
@classmethod
def cancel(cls, spec: AssetSpec) -> bool:
"""Cancel a pending load request. See class docstring for semantics."""
key = _request_key(spec)
inst = cls.get()
req = inst._requests.get(key)
if req is None:
return False
req.cancelled = True
cancelled = False
if req.future is not None:
cancelled = req.future.cancel()
req.status = LoadStatus.CANCELLED
if cancelled:
inst._requests.pop(key, None)
return cancelled
[docs]
@classmethod
def load_threaded_get_status(cls, spec: AssetSpec | None = None) -> tuple[str, float]:
"""Check the status of a background load.
If *spec* is ``None`` returns the most recent request's status.
"""
inst = cls.get()
if spec is None:
if not inst._requests:
return ("idle", 0.0)
req = next(reversed(inst._requests.values()))
else:
key = _request_key(spec)
if (req := inst._requests.get(key)) is None:
return ("idle", 0.0)
# Enforce deadline lazily.
if (
req.status == LoadStatus.LOADING
and req.deadline is not None
and time.monotonic() >= req.deadline
):
req.error = TimeoutError(f"Load of {req.spec!r} exceeded deadline")
req.status = LoadStatus.ERROR
if req.future is not None:
req.future.cancel()
return (req.status.value, req.progress)
[docs]
@classmethod
def load_threaded_get(cls, spec: AssetSpec | None = None, *, timeout: float | None = None) -> Any:
"""Get the loaded resource (blocks if still loading).
If *spec* is ``None`` returns the result of the most recent request.
"""
inst = cls.get()
if spec is None:
if not inst._requests:
raise RuntimeError("No pending load requests")
key = next(reversed(inst._requests))
else:
key = _request_key(spec)
if (req := inst._requests.get(key)) is None:
raise RuntimeError(f"No load request for: {spec!r}")
# Work out how long to wait: explicit timeout > submission deadline > None.
wait: float | None = timeout
if wait is None and req.deadline is not None:
wait = max(0.0, req.deadline - time.monotonic())
if req.future is not None:
try:
req.future.result(timeout=wait)
except FutureTimeoutError as exc:
req.error = TimeoutError(f"Load of {spec!r} timed out after {wait}s")
req.status = LoadStatus.ERROR
raise req.error from exc
if req.status == LoadStatus.ERROR:
raise RuntimeError(f"Failed to load {spec!r}: {req.error}")
result = req.result
# Clean up completed request
del inst._requests[key]
return result
[docs]
@classmethod
def is_loading(cls) -> bool:
"""Check if any resources are currently loading."""
inst = cls.get()
return any(r.status == LoadStatus.LOADING for r in inst._requests.values())
[docs]
@classmethod
def get_progress(cls) -> float:
"""Get overall loading progress across all pending requests (0.0 to 1.0)."""
inst = cls.get()
if not inst._requests:
return 1.0
total = sum(r.progress for r in inst._requests.values())
return total / len(inst._requests)
# --- Built-in loader ---
@staticmethod
def _load_bytes(spec: AssetSpec) -> bytes:
"""Default loader: resolve *spec* to a path and read raw bytes."""
return resolve_asset_path(spec).read_bytes()