"""Volumetric fog pass — distance-based and height-based fog via compute shader."""
import logging
from typing import Any
import numpy as np
import vulkan as vk
from ..gpu.descriptors import (
DescriptorWriteBatch,
allocate_descriptor_set,
create_descriptor_set_layout,
create_pool_for_types,
)
from ..gpu.pipeline_compute import create_compute_pipeline
from .pass_helpers import create_nearest_sampler
__all__ = ["FogPass"]
log = logging.getLogger(__name__)
# Push constant: mat4(64) + vec4(16) + vec4(16) + vec4(16) + vec4(16) = 128 bytes
_PC_SIZE = 128
[docs]
class FogPass:
"""Compute-based volumetric fog: blends scene colour with fog based on depth.
Supports linear, exponential, and exponential-squared distance fog,
plus optional height-based fog falloff. Operates in-place on the HDR
colour image between the scene render and tone mapping.
"""
def __init__(self, engine: Any):
self._engine = engine
self._ready = False
# Compute pipeline
self._pipeline: Any = None
self._layout: Any = None
self._module: Any = None
# Descriptors
self._desc_pool: Any = None
self._desc_layout: Any = None
self._desc_set: Any = None
self._depth_sampler: Any = None
# Colour image handle (for layout transitions)
self._colour_image: Any = None
# Dimensions
self._width: int = 0
self._height: int = 0
# Public settings
self.enabled: bool = False
self.colour: tuple[float, float, float] = (0.7, 0.8, 0.9)
self.density: float = 0.01
self.start: float = 10.0
self.end: float = 100.0
self.mode: str = "exponential" # "linear", "exponential", "exponential_squared"
self.height_enabled: bool = False
self.height_density: float = 0.1
self.height_falloff: float = 1.0
[docs]
def setup(self, width: int, height: int, depth_view: Any, colour_view: Any, colour_image: Any = None) -> None:
"""Initialize fog compute pipeline and descriptors."""
self._width = width
self._height = height
self._colour_image = colour_image
self._create_sampler()
self._create_descriptors(depth_view, colour_view)
self._create_pipeline()
self._ready = True
log.debug("Fog pass initialized (%dx%d)", width, height)
def _create_sampler(self) -> None:
"""Create nearest-filter sampler for depth texture."""
self._depth_sampler = create_nearest_sampler(self._engine.ctx.device)
def _create_descriptors(self, depth_view: Any, colour_view: Any) -> None:
"""Create descriptor set: depth sampler (binding 0) + colour storage image (binding 1)."""
device = self._engine.ctx.device
cs = vk.VK_SHADER_STAGE_COMPUTE_BIT
self._desc_layout = create_descriptor_set_layout(device, [
(0, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, cs, 1),
(1, vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE, cs, 1),
])
self._desc_pool = create_pool_for_types(device, {
vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER: 1,
vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE: 1,
})
self._desc_set = allocate_descriptor_set(device, self._desc_pool, self._desc_layout)
self._write_descriptors(depth_view, colour_view)
def _write_descriptors(self, depth_view: Any, colour_view: Any) -> None:
"""Write depth sampler and colour storage image to descriptor set."""
with DescriptorWriteBatch(self._engine.ctx.device) as batch:
batch.image(
self._desc_set, 0, depth_view, self._depth_sampler,
image_layout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL,
)
batch.storage_image(self._desc_set, 1, colour_view)
def _create_pipeline(self) -> None:
"""Create fog compute pipeline."""
e = self._engine
self._pipeline, self._layout, self._module = create_compute_pipeline(
e.ctx.device, e.shader_dir / "fog.comp", [self._desc_layout], _PC_SIZE,
)
[docs]
def render(self, cmd: Any, inv_vp: np.ndarray, camera_y: float = 0.0) -> None:
"""Dispatch fog compute shader. Call after HDR pass ends, before tonemap.
Args:
cmd: Active command buffer (outside any render pass).
inv_vp: Inverse view-projection matrix (row-major numpy).
camera_y: Camera world Y position for height fog.
"""
if not self._ready or not self.enabled:
return
ffi = vk.ffi
groups_x = (self._width + 7) // 8
groups_y = (self._height + 7) // 8
# Transition colour image: SHADER_READ_ONLY_OPTIMAL -> GENERAL for storage access
if self._colour_image:
img_barrier = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_SHADER_READ_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
newLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._colour_image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT | vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
0, 0, None, 0, None, 1, [img_barrier],
)
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._pipeline)
vk.vkCmdBindDescriptorSets(
cmd,
vk.VK_PIPELINE_BIND_POINT_COMPUTE,
self._layout,
0,
1,
[self._desc_set],
0,
None,
)
# Fog mode encoding: 0=linear, 1=exponential, 2=exponential_squared
mode_map = {"linear": 0.0, "exponential": 1.0, "exponential_squared": 2.0}
mode_val = mode_map.get(self.mode, 1.0)
# Pack push constants: mat4(64) + 4*vec4(64) = 128 bytes
inv_vp_t = np.ascontiguousarray(inv_vp.T, dtype=np.float32)
fog_colour = np.array([*self.colour, self.density], dtype=np.float32)
fog_params = np.array([self.start, self.end, mode_val, 0.0], dtype=np.float32)
height_params = np.array(
[
1.0 if self.height_enabled else 0.0,
self.height_density,
self.height_falloff,
camera_y,
],
dtype=np.float32,
)
resolution = np.array(
[
float(self._width),
float(self._height),
1.0 / self._width,
1.0 / self._height,
],
dtype=np.float32,
)
pc_data = inv_vp_t.tobytes() + fog_colour.tobytes() + fog_params.tobytes()
pc_data += height_params.tobytes() + resolution.tobytes()
cbuf = ffi.new("char[]", pc_data)
vk._vulkan.lib.vkCmdPushConstants(
cmd,
self._layout,
vk.VK_SHADER_STAGE_COMPUTE_BIT,
0,
_PC_SIZE,
cbuf,
)
vk.vkCmdDispatch(cmd, groups_x, groups_y, 1)
# Transition colour image: GENERAL -> SHADER_READ_ONLY_OPTIMAL for subsequent passes
if self._colour_image:
img_barrier = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
newLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._colour_image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT,
0, 0, None, 0, None, 1, [img_barrier],
)
else:
# Fallback: memory-only barrier when image handle not available
barrier = vk.VkMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT,
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT,
0, 1, [barrier], 0, None, 0, None,
)
[docs]
def resize(self, width: int, height: int, depth_view: Any, colour_view: Any, colour_image: Any = None) -> None:
"""Update descriptors for new dimensions."""
if not self._ready:
return
self._width = width
self._height = height
if colour_image is not None:
self._colour_image = colour_image
self._write_descriptors(depth_view, colour_view)
[docs]
def cleanup(self) -> None:
"""Release all GPU resources."""
if not self._ready:
return
device = self._engine.ctx.device
if self._pipeline:
vk.vkDestroyPipeline(device, self._pipeline, None)
if self._layout:
vk.vkDestroyPipelineLayout(device, self._layout, None)
if self._module:
vk.vkDestroyShaderModule(device, self._module, None)
if self._desc_pool:
vk.vkDestroyDescriptorPool(device, self._desc_pool, None)
if self._desc_layout:
vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None)
if self._depth_sampler:
vk.vkDestroySampler(device, self._depth_sampler, None)
self._ready = False