"""Texture loading and bindless descriptor array management.
One canonical ``TextureManager`` serves both the Vulkan and web backends —
the only thing that differs between them is the ``_TextureRegistrar`` that
turns RGBA pixels into a backend-specific texture id. On the web path the
manager also retains pixel data so ``prepare_2d_overlays`` can re-ship it
over the drain channel; on the desktop path that's skipped (``retain_pixels=False``)
to avoid doubling VRAM.
"""
import hashlib
import io
import logging
import struct
import zlib
from pathlib import Path
from typing import Any, Protocol
import numpy as np
__all__ = ["TextureManager"]
log = logging.getLogger(__name__)
# Accepted texture source types at the public API surface. Kept as a tuple
# so it doubles as an isinstance() argument.
TextureSource = str | Path | bytes | np.ndarray
class _TextureRegistrar(Protocol):
"""Minimal contract every rendering backend exposes for pixel uploads.
Satisfied by ``Engine`` (Vulkan), ``WebRenderer`` (WebGPU drain channel),
and the tiny ``_PendingTextureRenderer`` shim used by 2D-only web exports.
"""
def upload_texture_pixels(self, pixels: np.ndarray, width: int, height: int) -> int: ...
def _decode_png_rgba(data: bytes) -> tuple[np.ndarray, int, int] | None:
"""Pure-python decoder for filter-0 RGBA PNGs — used when PIL isn't available
(e.g. slim Pyodide installs). Returns None if the bytes aren't such a PNG.
"""
if data[:8] != b"\x89PNG\r\n\x1a\n":
return None
pos = 8
width = height = 0
idat_chunks: list[bytes] = []
while pos < len(data):
(length,) = struct.unpack_from(">I", data, pos)
ctype = data[pos + 4:pos + 8]
chunk_data = data[pos + 8:pos + 8 + length]
pos += 12 + length
if ctype == b"IHDR":
width, height = struct.unpack_from(">II", chunk_data, 0)
elif ctype == b"IDAT":
idat_chunks.append(chunk_data)
elif ctype == b"IEND":
break
if not idat_chunks or width == 0:
return None
raw = zlib.decompress(b"".join(idat_chunks))
stride = 1 + width * 4
pixels = np.empty((height, width, 4), dtype=np.uint8)
for y in range(height):
row_start = y * stride + 1 # skip filter byte
pixels[y] = np.frombuffer(raw[row_start:row_start + width * 4], dtype=np.uint8).reshape(width, 4)
return pixels, width, height
def _load_pixels_from_bytes(data: bytes) -> tuple[np.ndarray, int, int]:
"""Decode PNG/JPG bytes to RGBA uint8 pixels. Prefers PIL when installed,
falls back to the pure-python PNG decoder otherwise.
"""
try:
from PIL import Image # type: ignore[import-not-found]
except ImportError:
decoded = _decode_png_rgba(data)
if decoded is None:
raise ValueError("Texture bytes are not a filter-0 RGBA PNG and PIL is unavailable") # noqa: B904
return decoded
img = Image.open(io.BytesIO(data)).convert("RGBA")
pixels = np.ascontiguousarray(np.array(img, dtype=np.uint8))
return pixels, img.width, img.height
def _load_pixels_from_path(path: Path) -> tuple[np.ndarray, int, int]:
"""Decode a texture on disk. PIL is required for JPEG; for PNG we fall
back to the pure-python decoder so headless / Pyodide installs work.
"""
try:
from PIL import Image # type: ignore[import-not-found]
except ImportError:
return _load_pixels_from_bytes(path.read_bytes())
img = Image.open(str(path)).convert("RGBA")
pixels = np.ascontiguousarray(np.array(img, dtype=np.uint8))
return pixels, img.width, img.height
def _release_texture(manager_ref: Any, idx: int,
cache_key: str, source_id: int) -> None:
"""Top-level weakref.finalize callback — delegates to manager.release."""
manager = manager_ref()
if manager is None:
return
try:
manager.release(idx, cache_key=cache_key, source_id=source_id)
except Exception:
log.exception("TextureManager.release failed during finalize")
[docs]
class TextureManager:
"""Loads textures via a backend registrar and caches by source identity.
Desktop Vulkan: ``TextureManager(engine)`` — pixels uploaded and forgotten.
Web: ``TextureManager(renderer, retain_pixels=True)`` — pixels retained so
``prepare_2d_overlays`` can re-ship them over the drain channel on demand.
"""
def __init__(self, registrar: _TextureRegistrar, *, retain_pixels: bool = False) -> None:
self._registrar = registrar
self._retain_pixels = retain_pixels
self._cache: dict[str, int] = {} # source key → tex_id
self._sizes: dict[int, tuple[int, int]] = {} # tex_id → (w, h)
self._pixels_by_id: dict[int, np.ndarray] = {} # only populated when retain_pixels
# weakref.finalize handles, keyed by tex_id, so array-sourced textures
# reclaim their backend slot when the owning ndarray is GC'd. Only
# populated for ``load_from_array`` — file/bytes sources are content-
# hashed and outlive the caller's reference intentionally.
self._finalizers: dict[int, Any] = {}
# ------------------------------------------------------------------
# Canonical entry point
# ------------------------------------------------------------------
[docs]
def resolve(self, source: TextureSource | None) -> int:
"""Resolve any supported texture source to a backend texture index.
Returns -1 for ``None``, empty strings, or sources that cannot be
resolved (e.g. a path that does not exist). All callers that accept
a user-provided ``texture`` property should go through this method.
Supported sources:
* ``str`` / ``pathlib.Path`` — file on disk (PNG / JPG / ...)
* ``bytes`` — raw encoded image data (PNG / JPG)
* ``numpy.ndarray`` — RGBA uint8 pixels, shape ``(H, W, 4)``
"""
if source is None:
return -1
if isinstance(source, np.ndarray):
return self.load_from_array(source)
if isinstance(source, bytes | bytearray | memoryview):
return self.load_from_bytes(bytes(source))
if isinstance(source, str) and source == "":
return -1
return self.load_if_exists(source)
# ------------------------------------------------------------------
# Individual loaders
# ------------------------------------------------------------------
[docs]
def load(self, path: str | Path) -> int:
"""Load a texture from disk. Cached by resolved path."""
resolved = Path(path).resolve()
key = "path:" + str(resolved)
if key in self._cache:
return self._cache[key]
pixels, width, height = _load_pixels_from_path(resolved)
idx = self._register(pixels, width, height)
self._cache[key] = idx
log.debug("TextureManager: %s → index %d", resolved.name, idx)
return idx
[docs]
def load_from_bytes(self, data: bytes) -> int:
"""Load a texture from in-memory image bytes. Cached by content hash."""
key = "bytes:" + hashlib.sha256(data).hexdigest()
if key in self._cache:
return self._cache[key]
try:
pixels, width, height = _load_pixels_from_bytes(data)
except ValueError:
log.warning("Failed to decode texture from bytes")
return -1
idx = self._register(pixels, width, height)
self._cache[key] = idx
log.debug("TextureManager: embedded %dx%d → index %d", width, height, idx)
return idx
[docs]
def load_from_array(self, pixels: np.ndarray) -> int:
"""Upload an RGBA uint8 ndarray of shape ``(H, W, 4)``.
Cached by the array's object identity plus shape/dtype — passing the
same ndarray instance returns the same index.
"""
if pixels.ndim != 3 or pixels.shape[2] != 4:
raise ValueError(f"Expected RGBA pixels with shape (H, W, 4); got {pixels.shape}")
if pixels.dtype != np.uint8:
raise ValueError(f"Expected uint8 pixels; got {pixels.dtype}")
h, w = pixels.shape[:2]
key = f"array:{id(pixels)}:{w}x{h}:{pixels.dtype}"
if key in self._cache:
return self._cache[key]
contig = np.ascontiguousarray(pixels)
idx = self._register(contig, w, h)
self._cache[key] = idx
# Hook the lifetime of the *source* ndarray so the texture slot is
# reclaimed when the caller's reference drops. Engine.unregister_texture
# pushes the slot back onto its free list; TextureManager.release drops
# its own bookkeeping. The finalizer holds only a weakref to self so
# the manager itself can still be GC'd.
import weakref
if idx not in self._finalizers:
self._finalizers[idx] = weakref.finalize(
pixels, _release_texture,
weakref.ref(self), idx, key, id(pixels),
)
return idx
[docs]
def load_if_exists(self, path: str | Path) -> int:
"""Load a texture if the file exists. Returns -1 if not found."""
p = Path(path)
if not p.exists():
return -1
return self.load(p)
# ------------------------------------------------------------------
# Queries
# ------------------------------------------------------------------
[docs]
def get_texture_size(self, tex_idx: int) -> tuple[int, int]:
"""Return (width, height) for a loaded texture index. (0, 0) if unknown."""
return self._sizes.get(tex_idx, (0, 0))
[docs]
def get_pixels(self, tex_id: int) -> np.ndarray | None:
"""Return retained RGBA pixels for ``tex_id``, or None.
Only populated when the manager was constructed with ``retain_pixels=True``.
Used by the web runtime to re-ship 2D overlay pixels over the drain
channel without the browser having to fetch them back out.
"""
return self._pixels_by_id.get(tex_id)
[docs]
@property
def count(self) -> int:
"""Number of unique textures loaded."""
return len(self._cache)
[docs]
def destroy(self) -> None:
"""Clear all caches (GPU resources are owned by the backend)."""
self._cache.clear()
self._sizes.clear()
self._pixels_by_id.clear()
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _register(self, pixels: np.ndarray, width: int, height: int) -> int:
idx = self._registrar.upload_texture_pixels(pixels, width, height)
self._sizes[idx] = (width, height)
if self._retain_pixels:
self._pixels_by_id[idx] = pixels
return idx
[docs]
def release(self, idx: int, cache_key: str | None = None,
source_id: int | None = None) -> None:
"""Reclaim a texture slot + drop cache bookkeeping.
Called by the weakref.finalize attached in ``load_from_array`` when
the source ndarray is GC'd. Backend unregister is delegated to the
registrar when it exposes ``unregister_texture`` (desktop Engine);
web registrars may opt out.
"""
self._sizes.pop(idx, None)
self._pixels_by_id.pop(idx, None)
if cache_key is not None:
self._cache.pop(cache_key, None)
unreg = getattr(self._registrar, "unregister_texture", None)
if callable(unreg):
try:
unreg(idx)
except Exception:
log.exception("unregister_texture(%d) failed", idx)
self._finalizers.pop(idx, None)