Source code for simvx.graphics.streaming.scene3d_serializer

"""Binary serialization of 3D render state for the web export.

Serializes one 3D scene frame into compact binary for the browser-side WebGPU
renderer. Resource uploads (meshes, textures) travel over the separate TLV
drain channel — see ``simvx.graphics.web.resource_kinds`` — so this frame
payload now carries only per-frame state.

Wire format per frame::

    HEADER (20 bytes):
        frame_id(u32) flags(u32) viewport_count(u32) light_count(u32) draw_group_count(u32)

    Per VIEWPORT (136 bytes):
        x(u32) y(u32) w(u32) h(u32) + view_mat(16×f32) + proj_mat(16×f32)

    MATERIALS:
        material_count(u32) + material_bytes(material_count × MATERIAL_DTYPE.itemsize)

    LIGHTS (light_count × LIGHT_DTYPE.itemsize bytes):
        Raw LIGHT_DTYPE data

    DRAW GROUPS (draw_group_count entries):
        Per group (36-byte header):
            mesh_id(u32) index_count(u32) instance_count(u32) pass_type(u32)
            albedo_tex(u32) normal_tex(u32) metallic_roughness_tex(u32)
            emissive_tex(u32) ao_tex(u32)
        transform_bytes(instance_count × 64)  -- 4×4 model matrix per instance, f32
        material_ids(instance_count × u32)
        pass_type: 0=OPAQUE, 1=DOUBLE_SIDED, 2=TRANSPARENT

    POST-PROCESS (only if flags bit 0 set, 64 bytes):
        bloom_enabled(u32) bloom_threshold(f32) bloom_intensity(f32) bloom_soft_knee(f32)
        fog_colour(4×f32) — rgb + density packed per Vulkan forward pass convention
        fog_params(4×f32)  — start, end, mode (0/1/2 linear/exp/exp²), enabled (0|1)
        fog_height_params(4×f32) — height, height_density, _pad, _pad

    TILEMAPS (only if flags bit 1 set):
        layer_count(u32)
        Per layer:
            tileset_tex(u32) tile_count(u32) tile_w(f32) tile_h(f32)
            tiles(tile_count × 32 bytes)  -- TILE_INSTANCE_DTYPE
"""

import struct
from typing import Any

import numpy as np

from .._types import LIGHT_DTYPE, MATERIAL_DTYPE
from ..renderer.tile_types import TILE_INSTANCE_DTYPE

__all__ = ["FLAG_HAS_POST_PROCESS", "FLAG_HAS_TILEMAPS", "Scene3DSerializer"]

FLAG_HAS_POST_PROCESS = 1 << 0
FLAG_HAS_TILEMAPS = 1 << 1

# Header: frame_id + flags + viewport_count + light_count + draw_group_count
_HEADER = struct.Struct("<IIIII")
# Viewport rect: x + y + w + h
_VP_RECT = struct.Struct("<IIII")
# Draw group header: mesh_id + idx_count + inst_count + pass_type + 5 texture ids
_GROUP_HEADER = struct.Struct("<IIIIIIIII")
# Post-process bloom: bloom_enabled + threshold + intensity + soft_knee (16 B)
_POST_PROCESS_BLOOM = struct.Struct("<Ifff")
# Post-process fog (48 B): colour(4f) + params(4f) + height_params(4f)
_POST_PROCESS_FOG = struct.Struct("<" + "f" * 12)
_POST_PROCESS_SIZE = _POST_PROCESS_BLOOM.size + _POST_PROCESS_FOG.size  # 64
# Tilemap layer header: tileset_tex + tile_count + tile_w + tile_h (16 B)
_TILEMAP_LAYER_HEADER = struct.Struct("<IIff")

_MAT4_BYTES = 64  # 16 × f32


[docs] class Scene3DSerializer: """Serializes 3D scene state into compact binary frames for WebGPU."""
[docs] @staticmethod def serialize_frame( frame_id: int, viewports: list[dict[str, Any]], lights: np.ndarray, draw_groups: list[dict[str, Any]], materials: np.ndarray | None = None, post_process: dict[str, Any] | None = None, tilemap_layers: list[dict[str, Any]] | None = None, ) -> bytes: """Serialize a 3D scene frame. Args: frame_id: Monotonic frame counter. viewports: ``[{x, y, width, height, view_matrix, proj_matrix}]`` (matrices 4×4 float32). lights: ``LIGHT_DTYPE`` structured array. draw_groups: ``[{mesh_id, index_count, instance_count_implicit_from_transforms, transforms, material_ids, pass_type, albedo_tex, normal_tex, metallic_roughness_tex, emissive_tex, ao_tex}]``. The texture ids drive per-group bind group selection on the JS side and default to 0 when absent (the JS renderer falls back to a shared 1×1 white texture for id 0). materials: ``MATERIAL_DTYPE`` structured array (uploaded every frame; defaults to empty). post_process: ``{bloom_enabled, bloom_threshold, bloom_intensity, bloom_soft_knee}``. tilemap_layers: ``[{tileset_tex, tile_size (w, h), tiles (TILE_INSTANCE_DTYPE structured array)}]``. Gated behind ``FLAG_HAS_TILEMAPS``; omitted entirely when falsy so tilemap-less scenes pay zero bytes. Returns: Compact binary frame ready for transmission to the browser. """ flags = FLAG_HAS_POST_PROCESS if post_process is not None else 0 if tilemap_layers: flags |= FLAG_HAS_TILEMAPS light_count = len(lights) if lights is not None and len(lights) > 0 else 0 parts: list[bytes] = [_HEADER.pack(frame_id, flags, len(viewports), light_count, len(draw_groups))] for vp in viewports: parts.append(_VP_RECT.pack(vp["x"], vp["y"], vp["width"], vp["height"])) parts.append(np.asarray(vp["view_matrix"], dtype=np.float32).tobytes()) parts.append(np.asarray(vp["proj_matrix"], dtype=np.float32).tobytes()) mats = np.asarray(materials, dtype=MATERIAL_DTYPE) if materials is not None else np.empty(0, dtype=MATERIAL_DTYPE) parts.append(struct.pack("<I", len(mats))) parts.append(mats.tobytes()) if light_count > 0: parts.append(np.asarray(lights, dtype=LIGHT_DTYPE).tobytes()) for group in draw_groups: transforms = np.asarray(group["transforms"], dtype=np.float32) mat_ids = np.asarray(group["material_ids"], dtype=np.uint32) instance_count = len(mat_ids) parts.append(_GROUP_HEADER.pack( group["mesh_id"], group["index_count"], instance_count, group.get("pass_type", 0), int(group.get("albedo_tex", 0)), int(group.get("normal_tex", 0)), int(group.get("metallic_roughness_tex", 0)), int(group.get("emissive_tex", 0)), int(group.get("ao_tex", 0)), )) parts.append(transforms.tobytes()) parts.append(mat_ids.tobytes()) if post_process is not None: parts.append(_POST_PROCESS_BLOOM.pack( 1 if post_process.get("bloom_enabled", False) else 0, float(post_process.get("bloom_threshold", 1.0)), float(post_process.get("bloom_intensity", 0.8)), float(post_process.get("bloom_soft_knee", 0.5)), )) fc = post_process.get("fog_colour", (0.5, 0.6, 0.7, 1.0)) fog_enabled_f = 1.0 if post_process.get("fog_enabled", False) else 0.0 parts.append(_POST_PROCESS_FOG.pack( float(fc[0]), float(fc[1]), float(fc[2]), float(post_process.get("fog_density", 0.02)), float(post_process.get("fog_start", 10.0)), float(post_process.get("fog_end", 100.0)), float(post_process.get("fog_mode", 1)), fog_enabled_f, float(post_process.get("fog_height", 0.0)), float(post_process.get("fog_height_density", 0.0)), 0.0, 0.0, )) if tilemap_layers: parts.append(struct.pack("<I", len(tilemap_layers))) for layer in tilemap_layers: tiles = np.ascontiguousarray(layer["tiles"], dtype=TILE_INSTANCE_DTYPE) tw, th = layer["tile_size"] parts.append(_TILEMAP_LAYER_HEADER.pack( int(layer["tileset_tex"]), len(tiles), float(tw), float(th), )) parts.append(tiles.tobytes()) return b"".join(parts)
[docs] @staticmethod def deserialize_frame(data: bytes) -> dict[str, Any]: """Deserialize a frame back into structured data (for testing).""" off = 0 frame_id, flags, vp_count, light_count, group_count = _HEADER.unpack_from(data, off) off += _HEADER.size viewports = [] for _ in range(vp_count): x, y, w, h = _VP_RECT.unpack_from(data, off) off += _VP_RECT.size view_mat = np.frombuffer(data[off:off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy() off += _MAT4_BYTES proj_mat = np.frombuffer(data[off:off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy() off += _MAT4_BYTES viewports.append({ "x": x, "y": y, "width": w, "height": h, "view_matrix": view_mat, "proj_matrix": proj_mat, }) (mat_count,) = struct.unpack_from("<I", data, off) off += 4 mat_bytes = mat_count * MATERIAL_DTYPE.itemsize materials = np.frombuffer(data[off:off + mat_bytes], dtype=MATERIAL_DTYPE).copy() \ if mat_count > 0 else np.empty(0, dtype=MATERIAL_DTYPE) off += mat_bytes lb = light_count * LIGHT_DTYPE.itemsize lights = np.frombuffer(data[off:off + lb], dtype=LIGHT_DTYPE).copy() if light_count > 0 else np.empty(0, dtype=LIGHT_DTYPE) off += lb draw_groups = [] for _ in range(group_count): (mesh_id, idx_count, inst_count, pass_type, albedo_tex, normal_tex, mr_tex, emissive_tex, ao_tex) = _GROUP_HEADER.unpack_from(data, off) off += _GROUP_HEADER.size tb = inst_count * _MAT4_BYTES transforms = np.frombuffer(data[off:off + tb], dtype=np.float32).reshape(inst_count, 4, 4).copy() off += tb mid_bytes = inst_count * 4 mat_ids = np.frombuffer(data[off:off + mid_bytes], dtype=np.uint32).copy() off += mid_bytes draw_groups.append({ "mesh_id": mesh_id, "index_count": idx_count, "transforms": transforms, "material_ids": mat_ids, "pass_type": pass_type, "albedo_tex": albedo_tex, "normal_tex": normal_tex, "metallic_roughness_tex": mr_tex, "emissive_tex": emissive_tex, "ao_tex": ao_tex, }) result: dict[str, Any] = { "frame_id": frame_id, "flags": flags, "viewports": viewports, "materials": materials, "lights": lights, "draw_groups": draw_groups, } if flags & FLAG_HAS_POST_PROCESS: bloom_en, bloom_thresh, bloom_int, bloom_knee = _POST_PROCESS_BLOOM.unpack_from(data, off) off += _POST_PROCESS_BLOOM.size fog = _POST_PROCESS_FOG.unpack_from(data, off) off += _POST_PROCESS_FOG.size result["post_process"] = { "bloom_enabled": bool(bloom_en), "bloom_threshold": bloom_thresh, "bloom_intensity": bloom_int, "bloom_soft_knee": bloom_knee, "fog_colour": (fog[0], fog[1], fog[2], fog[3]), "fog_density": fog[3], "fog_start": fog[4], "fog_end": fog[5], "fog_mode": int(fog[6]), "fog_enabled": bool(fog[7]), "fog_height": fog[8], "fog_height_density": fog[9], } tilemap_layers: list[dict[str, Any]] = [] if flags & FLAG_HAS_TILEMAPS: (layer_count,) = struct.unpack_from("<I", data, off) off += 4 for _ in range(layer_count): tex, tile_count, tw, th = _TILEMAP_LAYER_HEADER.unpack_from(data, off) off += _TILEMAP_LAYER_HEADER.size tb = tile_count * TILE_INSTANCE_DTYPE.itemsize tiles = np.frombuffer(data[off:off + tb], dtype=TILE_INSTANCE_DTYPE).copy() off += tb tilemap_layers.append({ "tileset_tex": tex, "tile_size": (tw, th), "tiles": tiles, }) result["tilemap_layers"] = tilemap_layers return result