Source code for simvx.graphics.renderer.web3d

"""WebGPU 3D renderer — collects submissions and serializes to binary via Scene3DSerializer.

Implements the Renderer ABC without any Vulkan dependency. Resource uploads
(meshes + material textures) travel on the separate drain channel to the
browser's Renderer3D, matching the 2D overlay path and the Vulkan backend.
Frame binaries carry only per-frame state (viewports, lights, materials,
draw groups, post-process).
"""

import logging
from typing import Any

import numpy as np

from .._types import ALPHA_OPAQUE, LIGHT_DTYPE, MATERIAL_DTYPE, VERTEX_DTYPE, MeshHandle
from ._base import Renderer
from .tile_types import TILE_INSTANCE_DTYPE
from .transparency import compute_sort_key, extract_camera_position
from .viewport_manager import ViewportManager

log = logging.getLogger(__name__)

__all__ = ["WebRenderer3D"]

# Default tex_id 0 is the shared 1×1 white texture baked into the JS renderer;
# allocated ids start at 1 so any tex_id <=0 means "no texture, use default".


[docs] class WebRenderer3D(Renderer): """WebGPU 3D renderer — serializes scene state + streams uploads via drain channel.""" def __init__(self, width: int, height: int) -> None: self._width = width self._height = height self._instances: list[tuple[MeshHandle, np.ndarray, int, int]] = [] # Per-frame tilemap layer submissions: (tile_data, tileset_tex_id, tile_size). self._tilemap_layers: list[tuple[np.ndarray, int, tuple[float, float]]] = [] self._materials: np.ndarray = np.empty(0, dtype=MATERIAL_DTYPE) self._lights: np.ndarray = np.empty(0, dtype=LIGHT_DTYPE) self._next_mesh_id = 0 self._next_tex_id = 1 self._mesh_data: dict[int, tuple[np.ndarray, np.ndarray]] = {} # Drain queue: list of (kind, id, payload_bytes) TLV tuples. WebApp # flushes this into the unified drain_resources() stream each tick. self._resource_queue: list[tuple[int, int, bytes]] = [] self._frame_id = 0 self.viewport_manager = ViewportManager() # Post-process state (bloom + distance/height fog) self._bloom_enabled = False self._bloom_threshold = 1.0 self._bloom_intensity = 0.8 self._bloom_soft_knee = 0.5 self._fog_enabled = False self._fog_mode = 1 # 0=linear, 1=exponential, 2=exponential_squared self._fog_density = 0.02 self._fog_start = 10.0 self._fog_end = 100.0 self._fog_colour: tuple[float, float, float, float] = (0.5, 0.6, 0.7, 1.0) self._fog_height = 0.0 self._fog_height_density = 0.0 # -- Frame lifecycle --
[docs] def begin_frame(self) -> None: self._instances.clear() self._tilemap_layers.clear()
[docs] def pre_render(self, cmd: Any) -> None: pass
[docs] def render(self, cmd: Any) -> None: pass
[docs] def resize(self, width: int, height: int) -> None: self._width, self._height = width, height
[docs] def destroy(self) -> None: self._mesh_data.clear() self._resource_queue.clear()
# -- Scene submissions --
[docs] def submit_instance(self, mesh_handle: MeshHandle, transform: np.ndarray, material_id: int = 0, viewport_id: int = 0) -> None: self._instances.append((mesh_handle, np.array(transform, dtype=np.float32), material_id, viewport_id))
[docs] def submit_multimesh(self, mesh_handle: MeshHandle, transforms: np.ndarray, material_id: int = 0, material_ids: np.ndarray | None = None, viewport_id: int = 0, count: int = 0) -> None: t = np.asarray(transforms, dtype=np.float32) n = count if count > 0 else len(t) for i in range(n): mid = int(material_ids[i]) if material_ids is not None and i < len(material_ids) else material_id self._instances.append((mesh_handle, t[i], mid, viewport_id))
[docs] def submit_skinned_instance(self, mesh_handle: MeshHandle, transform: np.ndarray, material_id: int, joint_matrices: np.ndarray) -> None: self._instances.append((mesh_handle, np.array(transform, dtype=np.float32), material_id, 0))
[docs] def set_materials(self, materials: np.ndarray) -> None: self._materials = np.asarray(materials, dtype=MATERIAL_DTYPE)
[docs] def set_lights(self, lights: np.ndarray) -> None: self._lights = np.asarray(lights, dtype=LIGHT_DTYPE)
[docs] def submit_text(self, text: str, x: float, y: float, size: float, colour: tuple[float, float, float, float], **kwargs: Any) -> None: pass
[docs] def submit_particles(self, particle_data: np.ndarray) -> None: pass
[docs] def submit_light2d(self, **kwargs: Any) -> None: pass
[docs] def submit_tilemap_layer(self, tile_data: np.ndarray, tileset_tex_id: int, tile_size: tuple[float, float]) -> None: """Queue a tilemap layer for serialization in the current frame. Mirrors the Vulkan ``TileMapPass.submit_layer`` signature so ``SceneAdapter`` can dispatch to either backend uniformly. """ if len(tile_data) == 0: return tiles = np.ascontiguousarray(tile_data, dtype=TILE_INSTANCE_DTYPE) self._tilemap_layers.append((tiles, int(tileset_tex_id), (float(tile_size[0]), float(tile_size[1]))))
[docs] def set_post_process(self, bloom_enabled: bool = False, bloom_threshold: float = 1.0, bloom_intensity: float = 0.8, bloom_soft_knee: float = 0.5, fog_enabled: bool = False, fog_mode: int = 1, fog_density: float = 0.02, fog_start: float = 10.0, fog_end: float = 100.0, fog_colour: tuple[float, float, float, float] = (0.5, 0.6, 0.7, 1.0), fog_height: float = 0.0, fog_height_density: float = 0.0) -> None: """Update post-processing settings (synced from WorldEnvironment).""" self._bloom_enabled = bloom_enabled self._bloom_threshold = bloom_threshold self._bloom_intensity = bloom_intensity self._bloom_soft_knee = bloom_soft_knee self._fog_enabled = fog_enabled self._fog_mode = fog_mode self._fog_density = fog_density self._fog_start = fog_start self._fog_end = fog_end self._fog_colour = fog_colour self._fog_height = fog_height self._fog_height_density = fog_height_density
# -- Resource management --
[docs] def register_mesh(self, vertices: np.ndarray, indices: np.ndarray) -> MeshHandle: from ..web.resource_kinds import pack_mesh verts = np.asarray(vertices, dtype=VERTEX_DTYPE) idxs = np.asarray(indices, dtype=np.uint32) mesh_id = self._next_mesh_id self._next_mesh_id += 1 self._mesh_data[mesh_id] = (verts, idxs) self._resource_queue.append(pack_mesh( mesh_id, verts.tobytes(), idxs.tobytes(), len(verts), len(idxs), )) radius = float(np.linalg.norm(verts["position"], axis=1).max()) if len(verts) > 0 else 0.0 return MeshHandle(id=mesh_id, vertex_count=len(verts), index_count=len(idxs), bounding_radius=radius)
[docs] def upload_texture_pixels(self, pixels: np.ndarray, width: int, height: int) -> int: """Mint a texture id and enqueue the pixel payload on the drain channel.""" from ..web.resource_kinds import pack_mesh_texture tex_id = self._next_tex_id self._next_tex_id += 1 self._resource_queue.append(pack_mesh_texture( tex_id, width, height, np.asarray(pixels, dtype=np.uint8), )) return tex_id
[docs] def drain_resources(self) -> list[tuple[int, int, bytes]]: """Return and clear pending ``(kind, id, payload)`` resource entries.""" if not self._resource_queue: return [] queue = self._resource_queue self._resource_queue = [] return queue
# -- Frame capture --
[docs] def capture_frame(self) -> np.ndarray: return np.zeros((self._height, self._width, 4), dtype=np.uint8)
# -- Serialization -- def _texture_ids_for(self, mat_id: int) -> tuple[int, int, int, int, int]: if mat_id < 0 or mat_id >= len(self._materials): return (0, 0, 0, 0, 0) m = self._materials[mat_id] return ( max(int(m["albedo_tex"]), 0), max(int(m["normal_tex"]), 0), max(int(m["metallic_roughness_tex"]), 0), max(int(m["emissive_tex"]), 0), max(int(m["ao_tex"]), 0), )
[docs] def serialize_frame(self) -> bytes: """Group instances by mesh + pass + texture-set, then serialize.""" from ..streaming.scene3d_serializer import Scene3DSerializer viewports: list[dict[str, Any]] = [] for _vp_id, vp in self.viewport_manager.get_all(): viewports.append({ "x": vp.x, "y": vp.y, "width": vp.width, "height": vp.height, "view_matrix": vp.camera_view, "proj_matrix": vp.camera_proj, }) if not viewports: viewports.append({ "x": 0, "y": 0, "width": self._width, "height": self._height, "view_matrix": np.eye(4, dtype=np.float32), "proj_matrix": np.eye(4, dtype=np.float32), }) mat_count = len(self._materials) # Group key = (mesh_id, pass_type, albedo, normal, mr, emissive, ao). # All 5 tex_ids come from the material bound to each instance; instances # sharing a material naturally collapse into one group, and we still get # correct per-group bind groups on the JS side. groups: dict[tuple[int, int, int, int, int, int, int], tuple[MeshHandle, list[np.ndarray], list[int]]] = {} for mesh_handle, transform, mat_id, _vp_id in self._instances: if mat_id < mat_count and self._materials[mat_id]["alpha_mode"] != ALPHA_OPAQUE: pass_type = 2 elif mat_id < mat_count and self._materials[mat_id]["double_sided"]: pass_type = 1 else: pass_type = 0 tex_ids = self._texture_ids_for(mat_id) key = (mesh_handle.id, pass_type, *tex_ids) if key not in groups: groups[key] = (mesh_handle, [], []) groups[key][1].append(transform) groups[key][2].append(mat_id) opaque_groups: list[dict[str, Any]] = [] double_sided_groups: list[dict[str, Any]] = [] transparent_groups: list[dict[str, Any]] = [] for key, (mesh_handle, transforms, mat_ids) in groups.items(): mid, pass_type, albedo, normal, mr, emissive, ao = key group = { "mesh_id": mid, "index_count": mesh_handle.index_count, "transforms": np.array(transforms, dtype=np.float32).reshape(-1, 4, 4), "material_ids": np.array(mat_ids, dtype=np.uint32), "pass_type": pass_type, "albedo_tex": albedo, "normal_tex": normal, "metallic_roughness_tex": mr, "emissive_tex": emissive, "ao_tex": ao, } if pass_type == 2: transparent_groups.append(group) elif pass_type == 1: double_sided_groups.append(group) else: opaque_groups.append(group) if transparent_groups and viewports: cam_pos = extract_camera_position(np.asarray(viewports[0]["view_matrix"], dtype=np.float32)) transparent_groups.sort(key=lambda g: compute_sort_key(g["transforms"][0], cam_pos)) draw_groups: list[dict[str, Any]] = opaque_groups + double_sided_groups + transparent_groups # Emit post-process when any effect is active — bloom OR fog triggers it. post_process = None if self._bloom_enabled or self._fog_enabled: post_process = { "bloom_enabled": self._bloom_enabled, "bloom_threshold": self._bloom_threshold, "bloom_intensity": self._bloom_intensity, "bloom_soft_knee": self._bloom_soft_knee, "fog_enabled": self._fog_enabled, "fog_mode": self._fog_mode, "fog_density": self._fog_density, "fog_start": self._fog_start, "fog_end": self._fog_end, "fog_colour": self._fog_colour, "fog_height": self._fog_height, "fog_height_density": self._fog_height_density, } tilemap_layers: list[dict[str, Any]] | None = None if self._tilemap_layers: tilemap_layers = [ {"tileset_tex": tex_id, "tile_size": tile_size, "tiles": tiles} for tiles, tex_id, tile_size in self._tilemap_layers ] data = Scene3DSerializer.serialize_frame( self._frame_id, viewports, self._lights, draw_groups, self._materials, post_process, tilemap_layers, ) self._frame_id += 1 return data