"""
Audio system — background music, UI sounds, and 3D spatial audio.
This module provides:
- AudioStream: Audio resource (WAV/OGG data)
- AudioStreamPlayer: Background music/UI sounds
- AudioStreamPlayer2D: 2D positional audio with panning
- AudioStreamPlayer3D: 3D spatial audio with attenuation
Public API::
from simvx.core import AudioStream, AudioStreamPlayer, Resource
# Filesystem audio file
player = AudioStreamPlayer(stream="music/theme.ogg", autoplay=True)
# Asset shipped inside a Python package
sfx = AudioStreamPlayer2D(stream=Resource("game.assets", "explosion.wav"))
sfx.play()
# Synthetic procedural tone
beep = AudioStreamPlayer(stream=AudioStream.tone(440))
beep.play()
"""
from __future__ import annotations
import logging
import math
import os
from importlib.resources.abc import Traversable
from typing import TYPE_CHECKING, Any, Union
import numpy as np
from .descriptors import Property
from .math.types import Vec2, Vec3, clamp
from .node import Node
from .nodes_2d.node2d import Node2D
from .nodes_3d.node3d import Node3D
from .properties import Colour
if TYPE_CHECKING:
from .resource import Resource
# Sample rate / channel count must match the audio backend.
_SAMPLE_RATE = 44100
_NCHANNELS = 2
AudioSource = Union[str, os.PathLike, "Resource", Traversable]
log = logging.getLogger(__name__)
__all__ = [
"AudioStream",
"AudioStreamPlayer",
"AudioStreamPlayer2D",
"AudioStreamPlayer3D",
"AudioListener",
]
# ============================================================================
# AudioStream — Audio resource
# ============================================================================
[docs]
class AudioStream:
"""Audio resource (WAV/OGG file or synthetic PCM).
This is a lightweight handle to audio data. Actual decoding is deferred
to the backend (miniaudio, SDL3, web audio).
Accepts any of:
- :class:`str` / :class:`os.PathLike` -- a filesystem audio file.
- :class:`~simvx.core.Resource` -- audio inside a Python package.
- :class:`importlib.resources.abc.Traversable` -- the raw return of
``importlib.resources.files(pkg) / name``.
Use :meth:`tone` for procedural sine-wave tones and :meth:`from_pcm` to
wrap pre-rendered PCM data.
Attributes:
source: Original spec the stream was constructed from -- a string,
:class:`pathlib.Path`, :class:`Resource`, or :class:`Traversable`.
Preserved verbatim so scene serialisation can round-trip it.
path: Resolved filesystem path string used by the backend (empty
string for synthetic streams that have no file).
backend_data: Backend-specific audio data (PCM ndarray, channel
id, etc). Set automatically when decoded; may also be set by
:meth:`from_pcm` / :meth:`tone`.
"""
__slots__ = ("source", "path", "backend_data")
def __init__(self, source: AudioSource):
from .resource import Resource
if isinstance(source, Resource):
self.source: Any = source
self.path = str(source.path)
elif isinstance(source, Traversable) and not isinstance(source, (str, os.PathLike)):
self.source = source
from .asset_resolver import _resolve_traversable
self.path = str(_resolve_traversable(source))
elif isinstance(source, (str, os.PathLike)):
spec = os.fspath(source)
if not isinstance(spec, str):
raise TypeError(f"AudioStream path must decode to str, got {type(spec).__name__}")
self.source = source
self.path = spec
else:
raise TypeError(
"AudioStream accepts str | os.PathLike | Resource | Traversable, "
f"got {type(source).__name__}"
)
self.backend_data: Any = None # Backend-specific data
[docs]
def __repr__(self):
return f"AudioStream({self.path!r})"
# ------------------------------------------------------------------
# Constructors for synthetic streams
# ------------------------------------------------------------------
[docs]
@classmethod
def tone(
cls,
freq_hz: float,
*,
duration: float = 1.0,
volume: float = 0.3,
sample_rate: int = _SAMPLE_RATE,
) -> AudioStream:
"""Generate a sine-wave tone at *freq_hz* with a short fade-in/out.
The resulting stream has its PCM data baked into ``backend_data`` so
the audio backend skips file decoding entirely.
"""
if freq_hz <= 0:
raise ValueError(f"tone freq_hz must be > 0, got {freq_hz}")
if duration <= 0:
raise ValueError(f"tone duration must be > 0, got {duration}")
n_frames = int(sample_rate * duration)
t = np.linspace(0.0, duration, n_frames, dtype=np.float32)
fade_frames = min(int(sample_rate * 0.02), n_frames // 4)
envelope = np.ones(n_frames, dtype=np.float32)
if fade_frames > 0:
envelope[:fade_frames] = np.linspace(0.0, 1.0, fade_frames, dtype=np.float32)
envelope[-fade_frames:] = np.linspace(1.0, 0.0, fade_frames, dtype=np.float32)
mono = (np.sin(2 * math.pi * freq_hz * t) * volume * envelope).astype(np.float32)
stereo = np.empty(n_frames * _NCHANNELS, dtype=np.float32)
stereo[0::_NCHANNELS] = mono
stereo[1::_NCHANNELS] = mono
return cls.from_pcm(stereo, name=f"tone_{int(freq_hz)}Hz")
[docs]
@classmethod
def from_pcm(cls, samples: np.ndarray, *, name: str = "pcm") -> AudioStream:
"""Wrap a pre-rendered PCM buffer as an AudioStream.
*samples* is a float32 ndarray, interleaved stereo (channels first
within each frame). *name* is a descriptive label used only in
:meth:`__repr__` and as the stream's ``path`` (which the backend
ignores when ``backend_data`` is set).
"""
if not isinstance(samples, np.ndarray):
raise TypeError(f"from_pcm samples must be a numpy ndarray, got {type(samples).__name__}")
stream = cls.__new__(cls)
stream.source = name
stream.path = name
stream.backend_data = samples.astype(np.float32, copy=False)
return stream
# ============================================================================
# AudioListener — Tracks camera position for 3D audio
# ============================================================================
[docs]
class AudioListener:
"""Singleton that tracks the active camera for 3D audio spatialization.
The listener position is automatically updated from Camera3D or Camera2D
by the backend's audio system.
"""
_instance: AudioListener | None = None
def __init__(self):
self.position_3d: Vec3 = Vec3()
self.position_2d: Vec2 = Vec2()
self.forward: Vec3 = Vec3(0, 0, -1) # Camera forward vector
self.up: Vec3 = Vec3(0, 1, 0)
[docs]
@classmethod
def get(cls) -> AudioListener:
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs]
@classmethod
def reset(cls):
"""Reset singleton (useful for tests)."""
cls._instance = None
# ============================================================================
# _AudioPlaybackMixin — Shared play/stop/pause/is_playing logic
# ============================================================================
class _AudioPlaybackMixin:
"""Mixin providing common audio playback state and backend interaction.
Subclasses must set self.stream, self._playing, self._paused,
self._backend_channel in __init__, and define volume_db, pitch_scale,
bus, autoplay, loop Settings.
"""
def _init_playback(self, stream: AudioSource | AudioStream | None):
self.stream: AudioStream | None = None
if stream is not None:
self.stream = stream if isinstance(stream, AudioStream) else AudioStream(stream)
self._playing: bool = False
self._paused: bool = False
self._backend_channel: Any = None
# Suppresses on_change-driven backend pushes while ``set_pan_and_gain``
# writes both Properties so the dual-write lands as one backend call.
self._suppress_audio_push: bool = False
def _get_backend(self):
tree = getattr(self, "_tree", None)
return getattr(tree, "_audio_backend", None) if tree else None
# ------------------------------------------------------------------
# Live property push — shared on_change handler
# ------------------------------------------------------------------
def _push_audio_state(self, backend, channel) -> None:
"""Push current volume/pan/pitch through ``backend`` for ``channel``.
Subclasses override this to supply spatial pan/pitch. The base
non-positional player sends pan=0.
"""
backend.update_audio_2d(channel, self.volume_db, 0.0)
def _on_volume_db_changed(self) -> None:
"""Push live volume change to the active backend channel.
No-op while not playing or before the backend wires up; tolerates
partial state because Property on_change can fire during ``__init__``.
"""
if getattr(self, "_suppress_audio_push", False):
return
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is None or channel is None:
return
self._push_audio_state(backend, channel)
def stop(self):
"""Stop playback."""
self._playing = False
self._paused = False
backend = self._get_backend()
if backend and self._backend_channel is not None:
backend.stop_audio(self._backend_channel)
self._backend_channel = None
def pause(self):
"""Pause playback. Call play() to resume."""
if not self._playing:
return
self._paused = True
self._playing = False
backend = self._get_backend()
if backend and self._backend_channel is not None:
backend.pause_audio(self._backend_channel)
def is_playing(self) -> bool:
"""Check if audio is currently playing."""
return self._playing and not self._paused
def is_paused(self) -> bool:
"""Check if audio is currently paused (call ``play()`` to resume)."""
return self._paused
def _resume_if_paused(self) -> bool:
"""If paused, resume playback and return True; otherwise return False."""
if not self._paused or self._backend_channel is None:
return False
self._playing = True
self._paused = False
backend = self._get_backend()
if backend:
backend.resume_audio(self._backend_channel)
return True
def _play_common(self, from_position: float = 0.0) -> bool:
"""Common play() preamble. Returns True if playback should proceed."""
if not self.stream:
return False
if from_position == 0.0 and self._resume_if_paused():
return False
self._playing = True
self._paused = False
return True
def _autoplay_check(self):
"""Start playback if autoplay is enabled. Call from ready()."""
if self.autoplay and self.stream:
self.play()
def _exit_tree(self):
super()._exit_tree()
self.stop()
# ============================================================================
# AudioStreamPlayer — Background music / UI sounds
# ============================================================================
[docs]
class AudioStreamPlayer(_AudioPlaybackMixin, Node):
"""Non-positional audio player for background music and UI sounds.
This player does not use 3D positioning — volume is constant regardless
of camera position. Use AudioStreamPlayer2D or AudioStreamPlayer3D for
spatial audio.
Settings:
volume_db: Volume in decibels (-80 to 24). 0 = full volume.
pitch_scale: Playback speed multiplier (0.5 to 2.0).
bus: Audio bus name ("master", "music", "sfx", etc).
autoplay: Start playing when added to scene tree.
loop: Loop playback when finished.
stream_mode: "memory" loads entire file; "streaming" reads in chunks.
buffer_size: Chunk size in bytes for streaming mode (default 64KB).
"""
volume_db = Property(
0.0,
range=(-80.0, 24.0),
hint="Volume in decibels",
group="Playback",
on_change="_on_volume_db_changed",
)
pitch_scale = Property(1.0, range=(0.5, 2.0), hint="Playback speed", group="Playback")
bus = Property("master", enum=["master", "music", "sfx", "ui"], hint="Audio bus", group="Playback")
autoplay = Property(False, hint="Auto-play on ready", group="Playback")
loop = Property(False, hint="Loop playback", group="Playback")
stream_mode = Property("memory", enum=["memory", "streaming"], hint="Load mode", group="Playback")
buffer_size = Property(65536, range=(4096, 524288), hint="Streaming buffer size", group="Playback")
def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs):
super().__init__(**kwargs)
self._init_playback(stream)
self._stream_file: Any = None # Open file handle for streaming mode
self._stream_offset: int = 0 # Current read offset in file
self._stream_data_offset: int = 0 # Byte offset past file header (e.g. WAV header)
[docs]
def ready(self):
self._autoplay_check()
[docs]
def process(self, delta: float):
"""Feed audio chunks to backend in streaming mode."""
if self.stream_mode == "streaming" and self._playing and not self._paused:
self._process_streaming()
def _process_streaming(self):
"""Read next chunk from file and feed to audio backend."""
if not self._stream_file or not self._tree:
return
backend = self._get_backend()
if not backend:
return
chunk = self._stream_file.read(self.buffer_size)
if not chunk:
if self.loop:
self._stream_file.seek(self._stream_data_offset)
chunk = self._stream_file.read(self.buffer_size)
if not chunk:
self.stop()
return
backend.feed_audio_chunk(self._backend_channel, chunk)
[docs]
def play(self, from_position: float = 0.0):
"""Start or resume playback.
Args:
from_position: Start position in seconds (0.0 = beginning).
"""
if not self._play_common(from_position):
return
# Open file for streaming mode
if self.stream_mode == "streaming" and self.stream.path:
if self._stream_file:
self._stream_file.close()
self._stream_file = None
try:
self._stream_file = open(self.stream.path, "rb") # noqa: SIM115
self._stream_data_offset = 0
header = self._stream_file.read(4)
if header == b"RIFF":
self._stream_file.seek(44) # Standard WAV header size
self._stream_data_offset = 44
else:
self._stream_file.seek(0)
except OSError:
log.warning("Failed to open audio stream file: %s", self.stream.path)
self._stream_file = None
backend = self._get_backend()
if backend:
if self.stream_mode == "streaming":
self._backend_channel = backend.open_stream(
volume_db=self.volume_db,
pitch=self.pitch_scale,
bus=self.bus,
)
else:
self._backend_channel = backend.play_audio(
self.stream,
volume_db=self.volume_db,
pitch=self.pitch_scale,
loop=self.loop,
bus=self.bus,
)
[docs]
def stop(self):
"""Stop playback and reset position to beginning."""
if self._stream_file:
self._stream_file.close()
self._stream_file = None
super().stop()
[docs]
def get_playback_position(self) -> float:
"""Get current playback position in seconds."""
backend = self._get_backend()
if backend and self._backend_channel is not None:
return backend.get_playback_position(self._backend_channel)
return 0.0
# ============================================================================
# AudioStreamPlayer2D — 2D positional audio
# ============================================================================
[docs]
class AudioStreamPlayer2D(_AudioPlaybackMixin, Node2D):
"""2D positional audio player with stereo panning.
Audio volume and pan are calculated based on distance from the 2D listener
(typically Camera2D position). Left/right panning simulates direction.
Settings:
volume_db: Base volume in decibels (-80 to 24).
pitch_scale: Playback speed multiplier (0.5 to 2.0).
bus: Audio bus name.
autoplay: Start playing when added to scene tree.
loop: Loop playback when finished.
max_distance: Distance at which audio is inaudible (pixels).
attenuation: Distance attenuation exponent (1.0 = linear, 2.0 = inverse square).
"""
volume_db = Property(
0.0,
range=(-80.0, 24.0),
hint="Volume in decibels",
group="Playback",
on_change="_on_volume_db_changed",
)
pitch_scale = Property(1.0, range=(0.5, 2.0), hint="Playback speed", group="Playback")
bus = Property("sfx", enum=["master", "music", "sfx", "ui"], hint="Audio bus", group="Playback")
autoplay = Property(False, hint="Auto-play on ready", group="Playback")
loop = Property(False, hint="Loop playback", group="Playback")
max_distance = Property(2000.0, range=(1.0, 10000.0), hint="Max hearing distance (pixels)", group="Spatial")
attenuation = Property(1.0, range=(0.1, 4.0), hint="Distance attenuation exponent", group="Spatial")
pan_override = Property(
None,
hint="Override positional pan; None = use world position",
group="Playback",
on_change="_on_pan_override_changed",
)
gizmo_colour = Colour((0.6, 0.4, 1.0, 0.5))
[docs]
def get_gizmo_lines(self) -> list[tuple[Vec2, Vec2]]:
"""Return circle showing the audio range."""
from .physics_nodes import _circle_lines_2d
p = self.world_position
return _circle_lines_2d(p.x, p.y, float(self.max_distance), 32)
def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs):
super().__init__(**kwargs)
self._init_playback(stream)
[docs]
def ready(self):
self._autoplay_check()
# ------------------------------------------------------------------
# Spatial helpers
# ------------------------------------------------------------------
def _attenuated_volume_db(self, distance: float) -> float:
"""Return distance-attenuated volume_db for a given listener distance."""
if distance > self.max_distance:
return -80.0
# volume_db = base - 80 * (dist / max_dist) ^ attenuation
dist_ratio = distance / self.max_distance
return self.volume_db - 80.0 * (dist_ratio**self.attenuation)
def _compute_positional_pan(self) -> float:
"""Compute the positional stereo pan (-1=left, 0=center, 1=right)."""
listener = AudioListener.get()
dx = self.world_position.x - listener.position_2d.x
return clamp(dx / self.max_distance, -1.0, 1.0)
def _push_audio_state(self, backend, channel) -> None:
"""Send distance-attenuated volume + (override or positional) pan."""
listener = AudioListener.get()
distance = (self.world_position - listener.position_2d).length()
volume = self._attenuated_volume_db(distance)
pan = self.pan_override if self.pan_override is not None else self._compute_positional_pan()
backend.update_audio_2d(channel, volume, pan)
def _on_pan_override_changed(self) -> None:
"""Push the new pan to the active backend channel."""
if getattr(self, "_suppress_audio_push", False):
return
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is None or channel is None:
return
self._push_audio_state(backend, channel)
[docs]
def set_pan_and_gain(self, pan: float, gain_db: float) -> None:
"""Atomically set ``pan_override`` + ``volume_db`` with one backend call.
Use when both must land on the same audio frame (e.g. fade-out before
retrigger). Setting them as separate Property assignments would fire
two backend updates.
"""
self._suppress_audio_push = True
try:
self.pan_override = pan
self.volume_db = gain_db
finally:
self._suppress_audio_push = False
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is not None and channel is not None:
self._push_audio_state(backend, channel)
[docs]
def process(self, delta: float):
"""Update 2D spatialization each frame."""
if not self._playing or self._paused:
return
backend = self._get_backend()
if backend and self._backend_channel is not None:
self._push_audio_state(backend, self._backend_channel)
[docs]
def play(self, from_position: float = 0.0):
"""Start or resume playback."""
if not self._play_common(from_position):
return
backend = self._get_backend()
if backend:
self._backend_channel = backend.play_audio_2d(
self.stream,
position=self.world_position,
volume_db=self.volume_db,
pitch=self.pitch_scale,
loop=self.loop,
bus=self.bus,
max_distance=self.max_distance,
)
# ============================================================================
# AudioStreamPlayer3D — 3D spatial audio
# ============================================================================
[docs]
class AudioStreamPlayer3D(_AudioPlaybackMixin, Node3D):
"""3D spatial audio player with distance attenuation and directional panning.
Audio volume and stereo panning are calculated based on distance and
direction from the 3D listener (typically Camera3D position/orientation).
Settings:
volume_db: Base volume in decibels (-80 to 24).
pitch_scale: Playback speed multiplier (0.5 to 2.0).
bus: Audio bus name.
autoplay: Start playing when added to scene tree.
loop: Loop playback when finished.
max_distance: Distance at which audio is inaudible (world units).
attenuation: Distance attenuation exponent (1.0 = linear, 2.0 = inverse square).
doppler_scale: Doppler effect strength (0.0 = off, 1.0 = realistic).
"""
volume_db = Property(
0.0,
range=(-80.0, 24.0),
hint="Volume in decibels",
group="Playback",
on_change="_on_volume_db_changed",
)
pitch_scale = Property(1.0, range=(0.5, 2.0), hint="Playback speed", group="Playback")
bus = Property("sfx", enum=["master", "music", "sfx", "ui"], hint="Audio bus", group="Playback")
autoplay = Property(False, hint="Auto-play on ready", group="Playback")
loop = Property(False, hint="Loop playback", group="Playback")
max_distance = Property(100.0, range=(1.0, 1000.0), hint="Max hearing distance", group="Spatial")
attenuation = Property(1.0, range=(0.1, 4.0), hint="Distance attenuation exponent", group="Spatial")
doppler_scale = Property(0.0, range=(0.0, 4.0), hint="Doppler effect strength", group="Spatial")
pan_override = Property(
None,
hint="Override directional pan; None = use world position",
group="Playback",
on_change="_on_pan_override_changed",
)
gizmo_colour = Colour((0.6, 0.4, 1.0, 0.5))
[docs]
def get_gizmo_lines(self) -> list[tuple[Vec3, Vec3]]:
"""Return 3 circles showing the audio range sphere."""
from .physics_nodes import _circle_lines_3d
p = self.world_position
r = float(self.max_distance)
lines: list[tuple[Vec3, Vec3]] = []
lines.extend(_circle_lines_3d(p, Vec3(1, 0, 0), Vec3(0, 1, 0), r))
lines.extend(_circle_lines_3d(p, Vec3(1, 0, 0), Vec3(0, 0, 1), r))
lines.extend(_circle_lines_3d(p, Vec3(0, 1, 0), Vec3(0, 0, 1), r))
return lines
def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs):
super().__init__(**kwargs)
self._init_playback(stream)
self._prev_position: Vec3 = Vec3() # For Doppler
[docs]
def ready(self):
self._prev_position = self.world_position
self._autoplay_check()
# ------------------------------------------------------------------
# Spatial helpers
# ------------------------------------------------------------------
def _compute_3d_state(self, delta: float) -> tuple[float, float, float]:
"""Return (volume_db, pan, pitch) for the current frame.
``delta`` drives Doppler. Pass 0.0 from on_change handlers (no
Doppler kick on a Property nudge — Doppler reapplies next frame).
``_prev_position`` is left untouched here; ``process()`` is the
sole owner of that state to keep Doppler velocity stable.
"""
listener = AudioListener.get()
to_source = self.world_position - listener.position_3d
distance = to_source.length()
# Distance attenuation
if distance > self.max_distance:
volume = -80.0
else:
d = max(distance, 0.1) # avoid divide-by-zero at the listener
dist_ratio = d / self.max_distance
volume = self.volume_db - 80.0 * (dist_ratio**self.attenuation)
# Pan: positional unless caller has overridden it
if self.pan_override is not None:
pan = float(self.pan_override)
elif distance > 0.01:
to_source_norm = to_source / distance
right = listener.forward.cross(listener.up)
pan = clamp(to_source_norm.dot(right), -1.0, 1.0)
else:
pan = 0.0
# Doppler — pitch shift from radial velocity
pitch = self.pitch_scale
if self.doppler_scale > 0.0 and delta > 0.0:
velocity = (self.world_position - self._prev_position) / delta
velocity_towards = velocity.dot(-to_source) / (distance if distance > 0.1 else 0.1)
speed_of_sound = 343.0
doppler_factor = 1.0 + (velocity_towards / speed_of_sound) * self.doppler_scale
pitch = self.pitch_scale * clamp(doppler_factor, 0.5, 2.0)
return volume, pan, pitch
def _push_audio_state(self, backend, channel) -> None:
"""Send current 3D volume/pan/pitch (without advancing Doppler velocity)."""
volume, pan, pitch = self._compute_3d_state(0.0)
backend.update_audio_3d(channel, volume, pan, pitch)
def _on_pan_override_changed(self) -> None:
if getattr(self, "_suppress_audio_push", False):
return
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is None or channel is None:
return
self._push_audio_state(backend, channel)
[docs]
def set_pan_and_gain(self, pan: float, gain_db: float) -> None:
"""Atomically set ``pan_override`` + ``volume_db`` with one backend call."""
self._suppress_audio_push = True
try:
self.pan_override = pan
self.volume_db = gain_db
finally:
self._suppress_audio_push = False
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is not None and channel is not None:
self._push_audio_state(backend, channel)
[docs]
def process(self, delta: float):
"""Update 3D spatialization each frame."""
if not self._playing or self._paused:
return
volume, pan, pitch = self._compute_3d_state(delta)
self._prev_position = Vec3(self.world_position)
backend = self._get_backend()
if backend and self._backend_channel is not None:
backend.update_audio_3d(self._backend_channel, volume, pan, pitch)
[docs]
def play(self, from_position: float = 0.0):
"""Start or resume playback."""
if not self._play_common(from_position):
return
self._prev_position = self.world_position
backend = self._get_backend()
if backend:
self._backend_channel = backend.play_audio_3d(
self.stream,
position=self.world_position,
volume_db=self.volume_db,
pitch=self.pitch_scale,
loop=self.loop,
bus=self.bus,
max_distance=self.max_distance,
)