Source code for simvx.graphics.renderer.fog_pass

"""Volumetric fog pass — distance-based and height-based fog via compute shader."""

import logging
from typing import Any

import numpy as np
import vulkan as vk

from ..gpu.descriptors import (
    DescriptorWriteBatch,
    allocate_descriptor_set,
    create_descriptor_set_layout,
    create_pool_for_types,
)
from ..gpu.pipeline_compute import create_compute_pipeline
from .pass_helpers import create_nearest_sampler

__all__ = ["FogPass"]

log = logging.getLogger(__name__)

# Push constant: mat4(64) + vec4(16) + vec4(16) + vec4(16) + vec4(16) = 128 bytes
_PC_SIZE = 128

[docs] class FogPass: """Compute-based volumetric fog: blends scene colour with fog based on depth. Supports linear, exponential, and exponential-squared distance fog, plus optional height-based fog falloff. Operates in-place on the HDR colour image between the scene render and tone mapping. """ def __init__(self, engine: Any): self._engine = engine self._ready = False # Compute pipeline self._pipeline: Any = None self._layout: Any = None self._module: Any = None # Descriptors self._desc_pool: Any = None self._desc_layout: Any = None self._desc_set: Any = None self._depth_sampler: Any = None # Colour image handle (for layout transitions) self._colour_image: Any = None # Dimensions self._width: int = 0 self._height: int = 0 # Public settings self.enabled: bool = False self.colour: tuple[float, float, float] = (0.7, 0.8, 0.9) self.density: float = 0.01 self.start: float = 10.0 self.end: float = 100.0 self.mode: str = "exponential" # "linear", "exponential", "exponential_squared" self.height_enabled: bool = False self.height_density: float = 0.1 self.height_falloff: float = 1.0
[docs] def setup(self, width: int, height: int, depth_view: Any, colour_view: Any, colour_image: Any = None) -> None: """Initialize fog compute pipeline and descriptors.""" self._width = width self._height = height self._colour_image = colour_image self._create_sampler() self._create_descriptors(depth_view, colour_view) self._create_pipeline() self._ready = True log.debug("Fog pass initialized (%dx%d)", width, height)
def _create_sampler(self) -> None: """Create nearest-filter sampler for depth texture.""" self._depth_sampler = create_nearest_sampler(self._engine.ctx.device) def _create_descriptors(self, depth_view: Any, colour_view: Any) -> None: """Create descriptor set: depth sampler (binding 0) + colour storage image (binding 1).""" device = self._engine.ctx.device cs = vk.VK_SHADER_STAGE_COMPUTE_BIT self._desc_layout = create_descriptor_set_layout(device, [ (0, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, cs, 1), (1, vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE, cs, 1), ]) self._desc_pool = create_pool_for_types(device, { vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER: 1, vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE: 1, }) self._desc_set = allocate_descriptor_set(device, self._desc_pool, self._desc_layout) self._write_descriptors(depth_view, colour_view) def _write_descriptors(self, depth_view: Any, colour_view: Any) -> None: """Write depth sampler and colour storage image to descriptor set.""" with DescriptorWriteBatch(self._engine.ctx.device) as batch: batch.image( self._desc_set, 0, depth_view, self._depth_sampler, image_layout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL, ) batch.storage_image(self._desc_set, 1, colour_view) def _create_pipeline(self) -> None: """Create fog compute pipeline.""" e = self._engine self._pipeline, self._layout, self._module = create_compute_pipeline( e.ctx.device, e.shader_dir / "fog.comp", [self._desc_layout], _PC_SIZE, )
[docs] def render(self, cmd: Any, inv_vp: np.ndarray, camera_y: float = 0.0) -> None: """Dispatch fog compute shader. Call after HDR pass ends, before tonemap. Args: cmd: Active command buffer (outside any render pass). inv_vp: Inverse view-projection matrix (row-major numpy). camera_y: Camera world Y position for height fog. """ if not self._ready or not self.enabled: return ffi = vk.ffi groups_x = (self._width + 7) // 8 groups_y = (self._height + 7) // 8 # Transition colour image: SHADER_READ_ONLY_OPTIMAL -> GENERAL for storage access if self._colour_image: img_barrier = vk.VkImageMemoryBarrier( srcAccessMask=vk.VK_ACCESS_SHADER_READ_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT, oldLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, newLayout=vk.VK_IMAGE_LAYOUT_GENERAL, srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, image=self._colour_image, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1, ), ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT | vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, 0, 0, None, 0, None, 1, [img_barrier], ) vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._pipeline) vk.vkCmdBindDescriptorSets( cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._layout, 0, 1, [self._desc_set], 0, None, ) # Fog mode encoding: 0=linear, 1=exponential, 2=exponential_squared mode_map = {"linear": 0.0, "exponential": 1.0, "exponential_squared": 2.0} mode_val = mode_map.get(self.mode, 1.0) # Pack push constants: mat4(64) + 4*vec4(64) = 128 bytes inv_vp_t = np.ascontiguousarray(inv_vp.T, dtype=np.float32) fog_colour = np.array([*self.colour, self.density], dtype=np.float32) fog_params = np.array([self.start, self.end, mode_val, 0.0], dtype=np.float32) height_params = np.array( [ 1.0 if self.height_enabled else 0.0, self.height_density, self.height_falloff, camera_y, ], dtype=np.float32, ) resolution = np.array( [ float(self._width), float(self._height), 1.0 / self._width, 1.0 / self._height, ], dtype=np.float32, ) pc_data = inv_vp_t.tobytes() + fog_colour.tobytes() + fog_params.tobytes() pc_data += height_params.tobytes() + resolution.tobytes() cbuf = ffi.new("char[]", pc_data) vk._vulkan.lib.vkCmdPushConstants( cmd, self._layout, vk.VK_SHADER_STAGE_COMPUTE_BIT, 0, _PC_SIZE, cbuf, ) vk.vkCmdDispatch(cmd, groups_x, groups_y, 1) # Transition colour image: GENERAL -> SHADER_READ_ONLY_OPTIMAL for subsequent passes if self._colour_image: img_barrier = vk.VkImageMemoryBarrier( srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT, oldLayout=vk.VK_IMAGE_LAYOUT_GENERAL, newLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, image=self._colour_image, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1, ), ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT, 0, 0, None, 0, None, 1, [img_barrier], ) else: # Fallback: memory-only barrier when image handle not available barrier = vk.VkMemoryBarrier( srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT, ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT, 0, 1, [barrier], 0, None, 0, None, )
[docs] def resize(self, width: int, height: int, depth_view: Any, colour_view: Any, colour_image: Any = None) -> None: """Update descriptors for new dimensions.""" if not self._ready: return self._width = width self._height = height if colour_image is not None: self._colour_image = colour_image self._write_descriptors(depth_view, colour_view)
[docs] def cleanup(self) -> None: """Release all GPU resources.""" if not self._ready: return device = self._engine.ctx.device if self._pipeline: vk.vkDestroyPipeline(device, self._pipeline, None) if self._layout: vk.vkDestroyPipelineLayout(device, self._layout, None) if self._module: vk.vkDestroyShaderModule(device, self._module, None) if self._desc_pool: vk.vkDestroyDescriptorPool(device, self._desc_pool, None) if self._desc_layout: vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None) if self._depth_sampler: vk.vkDestroySampler(device, self._depth_sampler, None) self._ready = False