"""Binary serialization of 3D render state for the web export.
Serializes one 3D scene frame into compact binary for the browser-side WebGPU
renderer. Resource uploads (meshes, textures) travel over the separate TLV
drain channel — see ``simvx.graphics.web.resource_kinds`` — so this frame
payload now carries only per-frame state.
Wire format per frame::
HEADER (20 bytes):
frame_id(u32) flags(u32) viewport_count(u32) light_count(u32) draw_group_count(u32)
Per VIEWPORT (136 bytes):
x(u32) y(u32) w(u32) h(u32) + view_mat(16×f32) + proj_mat(16×f32)
MATERIALS:
material_count(u32) + material_bytes(material_count × MATERIAL_DTYPE.itemsize)
LIGHTS (light_count × LIGHT_DTYPE.itemsize bytes):
Raw LIGHT_DTYPE data
DRAW GROUPS (draw_group_count entries):
Per group (36-byte header):
mesh_id(u32) index_count(u32) instance_count(u32) pass_type(u32)
albedo_tex(u32) normal_tex(u32) metallic_roughness_tex(u32)
emissive_tex(u32) ao_tex(u32)
transform_bytes(instance_count × 64) -- 4×4 model matrix per instance, f32
material_ids(instance_count × u32)
pass_type: 0=OPAQUE, 1=DOUBLE_SIDED, 2=TRANSPARENT
POST-PROCESS (only if flags bit 0 set, 64 bytes):
bloom_enabled(u32) bloom_threshold(f32) bloom_intensity(f32) bloom_soft_knee(f32)
fog_colour(4×f32) — rgb + density packed per Vulkan forward pass convention
fog_params(4×f32) — start, end, mode (0/1/2 linear/exp/exp²), enabled (0|1)
fog_height_params(4×f32) — height, height_density, _pad, _pad
TILEMAPS (only if flags bit 1 set):
layer_count(u32)
Per layer:
tileset_tex(u32) tile_count(u32) tile_w(f32) tile_h(f32)
tiles(tile_count × 32 bytes) -- TILE_INSTANCE_DTYPE
"""
import struct
from typing import Any
import numpy as np
from .._types import LIGHT_DTYPE, MATERIAL_DTYPE
from ..renderer.tile_types import TILE_INSTANCE_DTYPE
__all__ = ["FLAG_HAS_POST_PROCESS", "FLAG_HAS_TILEMAPS", "Scene3DSerializer"]
FLAG_HAS_POST_PROCESS = 1 << 0
FLAG_HAS_TILEMAPS = 1 << 1
# Header: frame_id + flags + viewport_count + light_count + draw_group_count
_HEADER = struct.Struct("<IIIII")
# Viewport rect: x + y + w + h
_VP_RECT = struct.Struct("<IIII")
# Draw group header: mesh_id + idx_count + inst_count + pass_type + 5 texture ids
_GROUP_HEADER = struct.Struct("<IIIIIIIII")
# Post-process bloom: bloom_enabled + threshold + intensity + soft_knee (16 B)
_POST_PROCESS_BLOOM = struct.Struct("<Ifff")
# Post-process fog (48 B): colour(4f) + params(4f) + height_params(4f)
_POST_PROCESS_FOG = struct.Struct("<" + "f" * 12)
_POST_PROCESS_SIZE = _POST_PROCESS_BLOOM.size + _POST_PROCESS_FOG.size # 64
# Tilemap layer header: tileset_tex + tile_count + tile_w + tile_h (16 B)
_TILEMAP_LAYER_HEADER = struct.Struct("<IIff")
_MAT4_BYTES = 64 # 16 × f32
[docs]
class Scene3DSerializer:
"""Serializes 3D scene state into compact binary frames for WebGPU."""
[docs]
@staticmethod
def serialize_frame(
frame_id: int,
viewports: list[dict[str, Any]],
lights: np.ndarray,
draw_groups: list[dict[str, Any]],
materials: np.ndarray | None = None,
post_process: dict[str, Any] | None = None,
tilemap_layers: list[dict[str, Any]] | None = None,
) -> bytes:
"""Serialize a 3D scene frame.
Args:
frame_id: Monotonic frame counter.
viewports: ``[{x, y, width, height, view_matrix, proj_matrix}]``
(matrices 4×4 float32).
lights: ``LIGHT_DTYPE`` structured array.
draw_groups: ``[{mesh_id, index_count, instance_count_implicit_from_transforms,
transforms, material_ids, pass_type, albedo_tex, normal_tex,
metallic_roughness_tex, emissive_tex, ao_tex}]``. The texture ids
drive per-group bind group selection on the JS side and default
to 0 when absent (the JS renderer falls back to a shared 1×1
white texture for id 0).
materials: ``MATERIAL_DTYPE`` structured array (uploaded every frame;
defaults to empty).
post_process: ``{bloom_enabled, bloom_threshold, bloom_intensity, bloom_soft_knee}``.
tilemap_layers: ``[{tileset_tex, tile_size (w, h), tiles (TILE_INSTANCE_DTYPE
structured array)}]``. Gated behind ``FLAG_HAS_TILEMAPS``; omitted entirely
when falsy so tilemap-less scenes pay zero bytes.
Returns:
Compact binary frame ready for transmission to the browser.
"""
flags = FLAG_HAS_POST_PROCESS if post_process is not None else 0
if tilemap_layers:
flags |= FLAG_HAS_TILEMAPS
light_count = len(lights) if lights is not None and len(lights) > 0 else 0
parts: list[bytes] = [_HEADER.pack(frame_id, flags, len(viewports), light_count, len(draw_groups))]
for vp in viewports:
parts.append(_VP_RECT.pack(vp["x"], vp["y"], vp["width"], vp["height"]))
parts.append(np.asarray(vp["view_matrix"], dtype=np.float32).tobytes())
parts.append(np.asarray(vp["proj_matrix"], dtype=np.float32).tobytes())
mats = np.asarray(materials, dtype=MATERIAL_DTYPE) if materials is not None else np.empty(0, dtype=MATERIAL_DTYPE)
parts.append(struct.pack("<I", len(mats)))
parts.append(mats.tobytes())
if light_count > 0:
parts.append(np.asarray(lights, dtype=LIGHT_DTYPE).tobytes())
for group in draw_groups:
transforms = np.asarray(group["transforms"], dtype=np.float32)
mat_ids = np.asarray(group["material_ids"], dtype=np.uint32)
instance_count = len(mat_ids)
parts.append(_GROUP_HEADER.pack(
group["mesh_id"], group["index_count"], instance_count,
group.get("pass_type", 0),
int(group.get("albedo_tex", 0)),
int(group.get("normal_tex", 0)),
int(group.get("metallic_roughness_tex", 0)),
int(group.get("emissive_tex", 0)),
int(group.get("ao_tex", 0)),
))
parts.append(transforms.tobytes())
parts.append(mat_ids.tobytes())
if post_process is not None:
parts.append(_POST_PROCESS_BLOOM.pack(
1 if post_process.get("bloom_enabled", False) else 0,
float(post_process.get("bloom_threshold", 1.0)),
float(post_process.get("bloom_intensity", 0.8)),
float(post_process.get("bloom_soft_knee", 0.5)),
))
fc = post_process.get("fog_colour", (0.5, 0.6, 0.7, 1.0))
fog_enabled_f = 1.0 if post_process.get("fog_enabled", False) else 0.0
parts.append(_POST_PROCESS_FOG.pack(
float(fc[0]), float(fc[1]), float(fc[2]),
float(post_process.get("fog_density", 0.02)),
float(post_process.get("fog_start", 10.0)),
float(post_process.get("fog_end", 100.0)),
float(post_process.get("fog_mode", 1)),
fog_enabled_f,
float(post_process.get("fog_height", 0.0)),
float(post_process.get("fog_height_density", 0.0)),
0.0, 0.0,
))
if tilemap_layers:
parts.append(struct.pack("<I", len(tilemap_layers)))
for layer in tilemap_layers:
tiles = np.ascontiguousarray(layer["tiles"], dtype=TILE_INSTANCE_DTYPE)
tw, th = layer["tile_size"]
parts.append(_TILEMAP_LAYER_HEADER.pack(
int(layer["tileset_tex"]), len(tiles), float(tw), float(th),
))
parts.append(tiles.tobytes())
return b"".join(parts)
[docs]
@staticmethod
def deserialize_frame(data: bytes) -> dict[str, Any]:
"""Deserialize a frame back into structured data (for testing)."""
off = 0
frame_id, flags, vp_count, light_count, group_count = _HEADER.unpack_from(data, off)
off += _HEADER.size
viewports = []
for _ in range(vp_count):
x, y, w, h = _VP_RECT.unpack_from(data, off)
off += _VP_RECT.size
view_mat = np.frombuffer(data[off:off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy()
off += _MAT4_BYTES
proj_mat = np.frombuffer(data[off:off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy()
off += _MAT4_BYTES
viewports.append({
"x": x, "y": y, "width": w, "height": h,
"view_matrix": view_mat, "proj_matrix": proj_mat,
})
(mat_count,) = struct.unpack_from("<I", data, off)
off += 4
mat_bytes = mat_count * MATERIAL_DTYPE.itemsize
materials = np.frombuffer(data[off:off + mat_bytes], dtype=MATERIAL_DTYPE).copy() \
if mat_count > 0 else np.empty(0, dtype=MATERIAL_DTYPE)
off += mat_bytes
lb = light_count * LIGHT_DTYPE.itemsize
lights = np.frombuffer(data[off:off + lb], dtype=LIGHT_DTYPE).copy() if light_count > 0 else np.empty(0, dtype=LIGHT_DTYPE)
off += lb
draw_groups = []
for _ in range(group_count):
(mesh_id, idx_count, inst_count, pass_type,
albedo_tex, normal_tex, mr_tex, emissive_tex, ao_tex) = _GROUP_HEADER.unpack_from(data, off)
off += _GROUP_HEADER.size
tb = inst_count * _MAT4_BYTES
transforms = np.frombuffer(data[off:off + tb], dtype=np.float32).reshape(inst_count, 4, 4).copy()
off += tb
mid_bytes = inst_count * 4
mat_ids = np.frombuffer(data[off:off + mid_bytes], dtype=np.uint32).copy()
off += mid_bytes
draw_groups.append({
"mesh_id": mesh_id, "index_count": idx_count, "transforms": transforms,
"material_ids": mat_ids, "pass_type": pass_type,
"albedo_tex": albedo_tex, "normal_tex": normal_tex,
"metallic_roughness_tex": mr_tex, "emissive_tex": emissive_tex, "ao_tex": ao_tex,
})
result: dict[str, Any] = {
"frame_id": frame_id, "flags": flags, "viewports": viewports,
"materials": materials, "lights": lights, "draw_groups": draw_groups,
}
if flags & FLAG_HAS_POST_PROCESS:
bloom_en, bloom_thresh, bloom_int, bloom_knee = _POST_PROCESS_BLOOM.unpack_from(data, off)
off += _POST_PROCESS_BLOOM.size
fog = _POST_PROCESS_FOG.unpack_from(data, off)
off += _POST_PROCESS_FOG.size
result["post_process"] = {
"bloom_enabled": bool(bloom_en),
"bloom_threshold": bloom_thresh,
"bloom_intensity": bloom_int,
"bloom_soft_knee": bloom_knee,
"fog_colour": (fog[0], fog[1], fog[2], fog[3]),
"fog_density": fog[3],
"fog_start": fog[4],
"fog_end": fog[5],
"fog_mode": int(fog[6]),
"fog_enabled": bool(fog[7]),
"fog_height": fog[8],
"fog_height_density": fog[9],
}
tilemap_layers: list[dict[str, Any]] = []
if flags & FLAG_HAS_TILEMAPS:
(layer_count,) = struct.unpack_from("<I", data, off)
off += 4
for _ in range(layer_count):
tex, tile_count, tw, th = _TILEMAP_LAYER_HEADER.unpack_from(data, off)
off += _TILEMAP_LAYER_HEADER.size
tb = tile_count * TILE_INSTANCE_DTYPE.itemsize
tiles = np.frombuffer(data[off:off + tb], dtype=TILE_INSTANCE_DTYPE).copy()
off += tb
tilemap_layers.append({
"tileset_tex": tex, "tile_size": (tw, th), "tiles": tiles,
})
result["tilemap_layers"] = tilemap_layers
return result