"""WebGPU 3D renderer — collects submissions and serializes to binary via Scene3DSerializer.
Implements the Renderer ABC without any Vulkan dependency. Resource uploads
(meshes + material textures) travel on the separate drain channel to the
browser's Renderer3D, matching the 2D overlay path and the Vulkan backend.
Frame binaries carry only per-frame state (viewports, lights, materials,
draw groups, post-process).
"""
import logging
from typing import Any
import numpy as np
from .._types import ALPHA_OPAQUE, LIGHT_DTYPE, MATERIAL_DTYPE, VERTEX_DTYPE, MeshHandle
from ._base import Renderer
from .tile_types import TILE_INSTANCE_DTYPE
from .transparency import compute_sort_key, extract_camera_position
from .viewport_manager import ViewportManager
log = logging.getLogger(__name__)
__all__ = ["WebRenderer3D"]
# Default tex_id 0 is the shared 1×1 white texture baked into the JS renderer;
# allocated ids start at 1 so any tex_id <=0 means "no texture, use default".
[docs]
class WebRenderer3D(Renderer):
"""WebGPU 3D renderer — serializes scene state + streams uploads via drain channel."""
def __init__(self, width: int, height: int) -> None:
self._width = width
self._height = height
self._instances: list[tuple[MeshHandle, np.ndarray, int, int]] = []
# Per-frame tilemap layer submissions: (tile_data, tileset_tex_id, tile_size).
self._tilemap_layers: list[tuple[np.ndarray, int, tuple[float, float]]] = []
self._materials: np.ndarray = np.empty(0, dtype=MATERIAL_DTYPE)
self._lights: np.ndarray = np.empty(0, dtype=LIGHT_DTYPE)
self._next_mesh_id = 0
self._next_tex_id = 1
self._mesh_data: dict[int, tuple[np.ndarray, np.ndarray]] = {}
# Drain queue: list of (kind, id, payload_bytes) TLV tuples. WebApp
# flushes this into the unified drain_resources() stream each tick.
self._resource_queue: list[tuple[int, int, bytes]] = []
self._frame_id = 0
self.viewport_manager = ViewportManager()
# Post-process state (bloom + distance/height fog)
self._bloom_enabled = False
self._bloom_threshold = 1.0
self._bloom_intensity = 0.8
self._bloom_soft_knee = 0.5
self._fog_enabled = False
self._fog_mode = 1 # 0=linear, 1=exponential, 2=exponential_squared
self._fog_density = 0.02
self._fog_start = 10.0
self._fog_end = 100.0
self._fog_colour: tuple[float, float, float, float] = (0.5, 0.6, 0.7, 1.0)
self._fog_height = 0.0
self._fog_height_density = 0.0
# -- Frame lifecycle --
[docs]
def begin_frame(self) -> None:
self._instances.clear()
self._tilemap_layers.clear()
[docs]
def pre_render(self, cmd: Any) -> None:
pass
[docs]
def render(self, cmd: Any) -> None:
pass
[docs]
def resize(self, width: int, height: int) -> None:
self._width, self._height = width, height
[docs]
def destroy(self) -> None:
self._mesh_data.clear()
self._resource_queue.clear()
# -- Scene submissions --
[docs]
def submit_instance(self, mesh_handle: MeshHandle, transform: np.ndarray, material_id: int = 0,
viewport_id: int = 0) -> None:
self._instances.append((mesh_handle, np.array(transform, dtype=np.float32), material_id, viewport_id))
[docs]
def submit_multimesh(self, mesh_handle: MeshHandle, transforms: np.ndarray, material_id: int = 0,
material_ids: np.ndarray | None = None, viewport_id: int = 0, count: int = 0) -> None:
t = np.asarray(transforms, dtype=np.float32)
n = count if count > 0 else len(t)
for i in range(n):
mid = int(material_ids[i]) if material_ids is not None and i < len(material_ids) else material_id
self._instances.append((mesh_handle, t[i], mid, viewport_id))
[docs]
def submit_skinned_instance(self, mesh_handle: MeshHandle, transform: np.ndarray, material_id: int,
joint_matrices: np.ndarray) -> None:
self._instances.append((mesh_handle, np.array(transform, dtype=np.float32), material_id, 0))
[docs]
def set_materials(self, materials: np.ndarray) -> None:
self._materials = np.asarray(materials, dtype=MATERIAL_DTYPE)
[docs]
def set_lights(self, lights: np.ndarray) -> None:
self._lights = np.asarray(lights, dtype=LIGHT_DTYPE)
[docs]
def submit_text(self, text: str, x: float, y: float, size: float,
colour: tuple[float, float, float, float], **kwargs: Any) -> None:
pass
[docs]
def submit_particles(self, particle_data: np.ndarray) -> None:
pass
[docs]
def submit_light2d(self, **kwargs: Any) -> None:
pass
[docs]
def submit_tilemap_layer(self, tile_data: np.ndarray, tileset_tex_id: int,
tile_size: tuple[float, float]) -> None:
"""Queue a tilemap layer for serialization in the current frame.
Mirrors the Vulkan ``TileMapPass.submit_layer`` signature so
``SceneAdapter`` can dispatch to either backend uniformly.
"""
if len(tile_data) == 0:
return
tiles = np.ascontiguousarray(tile_data, dtype=TILE_INSTANCE_DTYPE)
self._tilemap_layers.append((tiles, int(tileset_tex_id), (float(tile_size[0]), float(tile_size[1]))))
[docs]
def set_post_process(self, bloom_enabled: bool = False, bloom_threshold: float = 1.0,
bloom_intensity: float = 0.8, bloom_soft_knee: float = 0.5,
fog_enabled: bool = False, fog_mode: int = 1,
fog_density: float = 0.02, fog_start: float = 10.0,
fog_end: float = 100.0,
fog_colour: tuple[float, float, float, float] = (0.5, 0.6, 0.7, 1.0),
fog_height: float = 0.0, fog_height_density: float = 0.0) -> None:
"""Update post-processing settings (synced from WorldEnvironment)."""
self._bloom_enabled = bloom_enabled
self._bloom_threshold = bloom_threshold
self._bloom_intensity = bloom_intensity
self._bloom_soft_knee = bloom_soft_knee
self._fog_enabled = fog_enabled
self._fog_mode = fog_mode
self._fog_density = fog_density
self._fog_start = fog_start
self._fog_end = fog_end
self._fog_colour = fog_colour
self._fog_height = fog_height
self._fog_height_density = fog_height_density
# -- Resource management --
[docs]
def register_mesh(self, vertices: np.ndarray, indices: np.ndarray) -> MeshHandle:
from ..web.resource_kinds import pack_mesh
verts = np.asarray(vertices, dtype=VERTEX_DTYPE)
idxs = np.asarray(indices, dtype=np.uint32)
mesh_id = self._next_mesh_id
self._next_mesh_id += 1
self._mesh_data[mesh_id] = (verts, idxs)
self._resource_queue.append(pack_mesh(
mesh_id, verts.tobytes(), idxs.tobytes(), len(verts), len(idxs),
))
radius = float(np.linalg.norm(verts["position"], axis=1).max()) if len(verts) > 0 else 0.0
return MeshHandle(id=mesh_id, vertex_count=len(verts), index_count=len(idxs), bounding_radius=radius)
[docs]
def upload_texture_pixels(self, pixels: np.ndarray, width: int, height: int) -> int:
"""Mint a texture id and enqueue the pixel payload on the drain channel."""
from ..web.resource_kinds import pack_mesh_texture
tex_id = self._next_tex_id
self._next_tex_id += 1
self._resource_queue.append(pack_mesh_texture(
tex_id, width, height, np.asarray(pixels, dtype=np.uint8),
))
return tex_id
[docs]
def drain_resources(self) -> list[tuple[int, int, bytes]]:
"""Return and clear pending ``(kind, id, payload)`` resource entries."""
if not self._resource_queue:
return []
queue = self._resource_queue
self._resource_queue = []
return queue
# -- Frame capture --
[docs]
def capture_frame(self) -> np.ndarray:
return np.zeros((self._height, self._width, 4), dtype=np.uint8)
# -- Serialization --
def _texture_ids_for(self, mat_id: int) -> tuple[int, int, int, int, int]:
if mat_id < 0 or mat_id >= len(self._materials):
return (0, 0, 0, 0, 0)
m = self._materials[mat_id]
return (
max(int(m["albedo_tex"]), 0),
max(int(m["normal_tex"]), 0),
max(int(m["metallic_roughness_tex"]), 0),
max(int(m["emissive_tex"]), 0),
max(int(m["ao_tex"]), 0),
)
[docs]
def serialize_frame(self) -> bytes:
"""Group instances by mesh + pass + texture-set, then serialize."""
from ..streaming.scene3d_serializer import Scene3DSerializer
viewports: list[dict[str, Any]] = []
for _vp_id, vp in self.viewport_manager.get_all():
viewports.append({
"x": vp.x, "y": vp.y, "width": vp.width, "height": vp.height,
"view_matrix": vp.camera_view, "proj_matrix": vp.camera_proj,
})
if not viewports:
viewports.append({
"x": 0, "y": 0, "width": self._width, "height": self._height,
"view_matrix": np.eye(4, dtype=np.float32), "proj_matrix": np.eye(4, dtype=np.float32),
})
mat_count = len(self._materials)
# Group key = (mesh_id, pass_type, albedo, normal, mr, emissive, ao).
# All 5 tex_ids come from the material bound to each instance; instances
# sharing a material naturally collapse into one group, and we still get
# correct per-group bind groups on the JS side.
groups: dict[tuple[int, int, int, int, int, int, int],
tuple[MeshHandle, list[np.ndarray], list[int]]] = {}
for mesh_handle, transform, mat_id, _vp_id in self._instances:
if mat_id < mat_count and self._materials[mat_id]["alpha_mode"] != ALPHA_OPAQUE:
pass_type = 2
elif mat_id < mat_count and self._materials[mat_id]["double_sided"]:
pass_type = 1
else:
pass_type = 0
tex_ids = self._texture_ids_for(mat_id)
key = (mesh_handle.id, pass_type, *tex_ids)
if key not in groups:
groups[key] = (mesh_handle, [], [])
groups[key][1].append(transform)
groups[key][2].append(mat_id)
opaque_groups: list[dict[str, Any]] = []
double_sided_groups: list[dict[str, Any]] = []
transparent_groups: list[dict[str, Any]] = []
for key, (mesh_handle, transforms, mat_ids) in groups.items():
mid, pass_type, albedo, normal, mr, emissive, ao = key
group = {
"mesh_id": mid,
"index_count": mesh_handle.index_count,
"transforms": np.array(transforms, dtype=np.float32).reshape(-1, 4, 4),
"material_ids": np.array(mat_ids, dtype=np.uint32),
"pass_type": pass_type,
"albedo_tex": albedo,
"normal_tex": normal,
"metallic_roughness_tex": mr,
"emissive_tex": emissive,
"ao_tex": ao,
}
if pass_type == 2:
transparent_groups.append(group)
elif pass_type == 1:
double_sided_groups.append(group)
else:
opaque_groups.append(group)
if transparent_groups and viewports:
cam_pos = extract_camera_position(np.asarray(viewports[0]["view_matrix"], dtype=np.float32))
transparent_groups.sort(key=lambda g: compute_sort_key(g["transforms"][0], cam_pos))
draw_groups: list[dict[str, Any]] = opaque_groups + double_sided_groups + transparent_groups
# Emit post-process when any effect is active — bloom OR fog triggers it.
post_process = None
if self._bloom_enabled or self._fog_enabled:
post_process = {
"bloom_enabled": self._bloom_enabled,
"bloom_threshold": self._bloom_threshold,
"bloom_intensity": self._bloom_intensity,
"bloom_soft_knee": self._bloom_soft_knee,
"fog_enabled": self._fog_enabled,
"fog_mode": self._fog_mode,
"fog_density": self._fog_density,
"fog_start": self._fog_start,
"fog_end": self._fog_end,
"fog_colour": self._fog_colour,
"fog_height": self._fog_height,
"fog_height_density": self._fog_height_density,
}
tilemap_layers: list[dict[str, Any]] | None = None
if self._tilemap_layers:
tilemap_layers = [
{"tileset_tex": tex_id, "tile_size": tile_size, "tiles": tiles}
for tiles, tex_id, tile_size in self._tilemap_layers
]
data = Scene3DSerializer.serialize_frame(
self._frame_id, viewports, self._lights, draw_groups, self._materials,
post_process, tilemap_layers,
)
self._frame_id += 1
return data