Source code for simvx.graphics.materials.texture

"""Texture loading and bindless descriptor array management.

One canonical ``TextureManager`` serves both the Vulkan and web backends —
the only thing that differs between them is the ``_TextureRegistrar`` that
turns RGBA pixels into a backend-specific texture id. On the web path the
manager also retains pixel data so ``prepare_2d_overlays`` can re-ship it
over the drain channel; on the desktop path that's skipped (``retain_pixels=False``)
to avoid doubling VRAM.
"""

import hashlib
import io
import logging
import struct
import zlib
from pathlib import Path
from typing import Any, Protocol

import numpy as np

__all__ = ["TextureManager"]

log = logging.getLogger(__name__)

# Accepted texture source types at the public API surface. Kept as a tuple
# so it doubles as an isinstance() argument.
TextureSource = str | Path | bytes | np.ndarray


class _TextureRegistrar(Protocol):
    """Minimal contract every rendering backend exposes for pixel uploads.

    Satisfied by ``Engine`` (Vulkan), ``WebRenderer`` (WebGPU drain channel),
    and the tiny ``_PendingTextureRenderer`` shim used by 2D-only web exports.
    """

    def upload_texture_pixels(self, pixels: np.ndarray, width: int, height: int) -> int: ...


def _decode_png_rgba(data: bytes) -> tuple[np.ndarray, int, int] | None:
    """Pure-python decoder for filter-0 RGBA PNGs — used when PIL isn't available
    (e.g. slim Pyodide installs). Returns None if the bytes aren't such a PNG.
    """
    if data[:8] != b"\x89PNG\r\n\x1a\n":
        return None
    pos = 8
    width = height = 0
    idat_chunks: list[bytes] = []
    while pos < len(data):
        (length,) = struct.unpack_from(">I", data, pos)
        ctype = data[pos + 4:pos + 8]
        chunk_data = data[pos + 8:pos + 8 + length]
        pos += 12 + length
        if ctype == b"IHDR":
            width, height = struct.unpack_from(">II", chunk_data, 0)
        elif ctype == b"IDAT":
            idat_chunks.append(chunk_data)
        elif ctype == b"IEND":
            break
    if not idat_chunks or width == 0:
        return None
    raw = zlib.decompress(b"".join(idat_chunks))
    stride = 1 + width * 4
    pixels = np.empty((height, width, 4), dtype=np.uint8)
    for y in range(height):
        row_start = y * stride + 1  # skip filter byte
        pixels[y] = np.frombuffer(raw[row_start:row_start + width * 4], dtype=np.uint8).reshape(width, 4)
    return pixels, width, height


def _load_pixels_from_bytes(data: bytes) -> tuple[np.ndarray, int, int]:
    """Decode PNG/JPG bytes to RGBA uint8 pixels. Prefers PIL when installed,
    falls back to the pure-python PNG decoder otherwise.
    """
    try:
        from PIL import Image  # type: ignore[import-not-found]
    except ImportError:
        decoded = _decode_png_rgba(data)
        if decoded is None:
            raise ValueError("Texture bytes are not a filter-0 RGBA PNG and PIL is unavailable")  # noqa: B904
        return decoded
    img = Image.open(io.BytesIO(data)).convert("RGBA")
    pixels = np.ascontiguousarray(np.array(img, dtype=np.uint8))
    return pixels, img.width, img.height


def _load_pixels_from_path(path: Path) -> tuple[np.ndarray, int, int]:
    """Decode a texture on disk. PIL is required for JPEG; for PNG we fall
    back to the pure-python decoder so headless / Pyodide installs work.
    """
    try:
        from PIL import Image  # type: ignore[import-not-found]
    except ImportError:
        return _load_pixels_from_bytes(path.read_bytes())
    img = Image.open(str(path)).convert("RGBA")
    pixels = np.ascontiguousarray(np.array(img, dtype=np.uint8))
    return pixels, img.width, img.height


def _release_texture(manager_ref: Any, idx: int,
                     cache_key: str, source_id: int) -> None:
    """Top-level weakref.finalize callback — delegates to manager.release."""
    manager = manager_ref()
    if manager is None:
        return
    try:
        manager.release(idx, cache_key=cache_key, source_id=source_id)
    except Exception:
        log.exception("TextureManager.release failed during finalize")


[docs] class TextureManager: """Loads textures via a backend registrar and caches by source identity. Desktop Vulkan: ``TextureManager(engine)`` — pixels uploaded and forgotten. Web: ``TextureManager(renderer, retain_pixels=True)`` — pixels retained so ``prepare_2d_overlays`` can re-ship them over the drain channel on demand. """ def __init__(self, registrar: _TextureRegistrar, *, retain_pixels: bool = False) -> None: self._registrar = registrar self._retain_pixels = retain_pixels self._cache: dict[str, int] = {} # source key → tex_id self._sizes: dict[int, tuple[int, int]] = {} # tex_id → (w, h) self._pixels_by_id: dict[int, np.ndarray] = {} # only populated when retain_pixels # weakref.finalize handles, keyed by tex_id, so array-sourced textures # reclaim their backend slot when the owning ndarray is GC'd. Only # populated for ``load_from_array`` — file/bytes sources are content- # hashed and outlive the caller's reference intentionally. self._finalizers: dict[int, Any] = {} # ------------------------------------------------------------------ # Canonical entry point # ------------------------------------------------------------------
[docs] def resolve(self, source: TextureSource | None) -> int: """Resolve any supported texture source to a backend texture index. Returns -1 for ``None``, empty strings, or sources that cannot be resolved (e.g. a path that does not exist). All callers that accept a user-provided ``texture`` property should go through this method. Supported sources: * ``str`` / ``pathlib.Path`` — file on disk (PNG / JPG / ...) * ``bytes`` — raw encoded image data (PNG / JPG) * ``numpy.ndarray`` — RGBA uint8 pixels, shape ``(H, W, 4)`` """ if source is None: return -1 if isinstance(source, np.ndarray): return self.load_from_array(source) if isinstance(source, bytes | bytearray | memoryview): return self.load_from_bytes(bytes(source)) if isinstance(source, str) and source == "": return -1 return self.load_if_exists(source)
# ------------------------------------------------------------------ # Individual loaders # ------------------------------------------------------------------
[docs] def load(self, path: str | Path) -> int: """Load a texture from disk. Cached by resolved path.""" resolved = Path(path).resolve() key = "path:" + str(resolved) if key in self._cache: return self._cache[key] pixels, width, height = _load_pixels_from_path(resolved) idx = self._register(pixels, width, height) self._cache[key] = idx log.debug("TextureManager: %s → index %d", resolved.name, idx) return idx
[docs] def load_from_bytes(self, data: bytes) -> int: """Load a texture from in-memory image bytes. Cached by content hash.""" key = "bytes:" + hashlib.sha256(data).hexdigest() if key in self._cache: return self._cache[key] try: pixels, width, height = _load_pixels_from_bytes(data) except ValueError: log.warning("Failed to decode texture from bytes") return -1 idx = self._register(pixels, width, height) self._cache[key] = idx log.debug("TextureManager: embedded %dx%d → index %d", width, height, idx) return idx
[docs] def load_from_array(self, pixels: np.ndarray) -> int: """Upload an RGBA uint8 ndarray of shape ``(H, W, 4)``. Cached by the array's object identity plus shape/dtype — passing the same ndarray instance returns the same index. """ if pixels.ndim != 3 or pixels.shape[2] != 4: raise ValueError(f"Expected RGBA pixels with shape (H, W, 4); got {pixels.shape}") if pixels.dtype != np.uint8: raise ValueError(f"Expected uint8 pixels; got {pixels.dtype}") h, w = pixels.shape[:2] key = f"array:{id(pixels)}:{w}x{h}:{pixels.dtype}" if key in self._cache: return self._cache[key] contig = np.ascontiguousarray(pixels) idx = self._register(contig, w, h) self._cache[key] = idx # Hook the lifetime of the *source* ndarray so the texture slot is # reclaimed when the caller's reference drops. Engine.unregister_texture # pushes the slot back onto its free list; TextureManager.release drops # its own bookkeeping. The finalizer holds only a weakref to self so # the manager itself can still be GC'd. import weakref if idx not in self._finalizers: self._finalizers[idx] = weakref.finalize( pixels, _release_texture, weakref.ref(self), idx, key, id(pixels), ) return idx
[docs] def load_if_exists(self, path: str | Path) -> int: """Load a texture if the file exists. Returns -1 if not found.""" p = Path(path) if not p.exists(): return -1 return self.load(p)
# ------------------------------------------------------------------ # Queries # ------------------------------------------------------------------
[docs] def get_texture_size(self, tex_idx: int) -> tuple[int, int]: """Return (width, height) for a loaded texture index. (0, 0) if unknown.""" return self._sizes.get(tex_idx, (0, 0))
[docs] def get_pixels(self, tex_id: int) -> np.ndarray | None: """Return retained RGBA pixels for ``tex_id``, or None. Only populated when the manager was constructed with ``retain_pixels=True``. Used by the web runtime to re-ship 2D overlay pixels over the drain channel without the browser having to fetch them back out. """ return self._pixels_by_id.get(tex_id)
[docs] @property def count(self) -> int: """Number of unique textures loaded.""" return len(self._cache)
[docs] def destroy(self) -> None: """Clear all caches (GPU resources are owned by the backend).""" self._cache.clear() self._sizes.clear() self._pixels_by_id.clear()
# ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _register(self, pixels: np.ndarray, width: int, height: int) -> int: idx = self._registrar.upload_texture_pixels(pixels, width, height) self._sizes[idx] = (width, height) if self._retain_pixels: self._pixels_by_id[idx] = pixels return idx
[docs] def release(self, idx: int, cache_key: str | None = None, source_id: int | None = None) -> None: """Reclaim a texture slot + drop cache bookkeeping. Called by the weakref.finalize attached in ``load_from_array`` when the source ndarray is GC'd. Backend unregister is delegated to the registrar when it exposes ``unregister_texture`` (desktop Engine); web registrars may opt out. """ self._sizes.pop(idx, None) self._pixels_by_id.pop(idx, None) if cache_key is not None: self._cache.pop(cache_key, None) unreg = getattr(self._registrar, "unregister_texture", None) if callable(unreg): try: unreg(idx) except Exception: log.exception("unregister_texture(%d) failed", idx) self._finalizers.pop(idx, None)