Source code for simvx.core.audio

"""
Audio system — background music, UI sounds, and 3D spatial audio.

This module provides:
- AudioStream: Audio resource (WAV/OGG data)
- AudioStreamPlayer: Background music/UI sounds
- AudioStreamPlayer2D: 2D positional audio with panning
- AudioStreamPlayer3D: 3D spatial audio with attenuation

Public API::

    from simvx.core import AudioStream, AudioStreamPlayer, Resource

    # Filesystem audio file
    player = AudioStreamPlayer(stream="music/theme.ogg", autoplay=True)

    # Asset shipped inside a Python package
    sfx = AudioStreamPlayer2D(stream=Resource("game.assets", "explosion.wav"))
    sfx.play()

    # Synthetic procedural tone
    beep = AudioStreamPlayer(stream=AudioStream.tone(440))
    beep.play()
"""

from __future__ import annotations

import logging
import math
import os
from importlib.resources.abc import Traversable
from typing import TYPE_CHECKING, Any, Union

import numpy as np

from .descriptors import Property
from .math.types import Vec2, Vec3, clamp
from .node import Node
from .nodes_2d.node2d import Node2D
from .nodes_3d.node3d import Node3D
from .properties import Colour

if TYPE_CHECKING:
    from .resource import Resource

# Sample rate / channel count must match the audio backend.
_SAMPLE_RATE = 44100
_NCHANNELS = 2

AudioSource = Union[str, os.PathLike, "Resource", Traversable]

log = logging.getLogger(__name__)

__all__ = [
    "AudioStream",
    "AudioStreamPlayer",
    "AudioStreamPlayer2D",
    "AudioStreamPlayer3D",
    "AudioListener",
]

# ============================================================================
# AudioStream — Audio resource
# ============================================================================

[docs] class AudioStream: """Audio resource (WAV/OGG file or synthetic PCM). This is a lightweight handle to audio data. Actual decoding is deferred to the backend (miniaudio, SDL3, web audio). Accepts any of: - :class:`str` / :class:`os.PathLike` -- a filesystem audio file. - :class:`~simvx.core.Resource` -- audio inside a Python package. - :class:`importlib.resources.abc.Traversable` -- the raw return of ``importlib.resources.files(pkg) / name``. Use :meth:`tone` for procedural sine-wave tones and :meth:`from_pcm` to wrap pre-rendered PCM data. Attributes: source: Original spec the stream was constructed from -- a string, :class:`pathlib.Path`, :class:`Resource`, or :class:`Traversable`. Preserved verbatim so scene serialisation can round-trip it. path: Resolved filesystem path string used by the backend (empty string for synthetic streams that have no file). backend_data: Backend-specific audio data (PCM ndarray, channel id, etc). Set automatically when decoded; may also be set by :meth:`from_pcm` / :meth:`tone`. """ __slots__ = ("source", "path", "backend_data") def __init__(self, source: AudioSource): from .resource import Resource if isinstance(source, Resource): self.source: Any = source self.path = str(source.path) elif isinstance(source, Traversable) and not isinstance(source, (str, os.PathLike)): self.source = source from .asset_resolver import _resolve_traversable self.path = str(_resolve_traversable(source)) elif isinstance(source, (str, os.PathLike)): spec = os.fspath(source) if not isinstance(spec, str): raise TypeError(f"AudioStream path must decode to str, got {type(spec).__name__}") self.source = source self.path = spec else: raise TypeError( "AudioStream accepts str | os.PathLike | Resource | Traversable, " f"got {type(source).__name__}" ) self.backend_data: Any = None # Backend-specific data
[docs] def __repr__(self): return f"AudioStream({self.path!r})"
# ------------------------------------------------------------------ # Constructors for synthetic streams # ------------------------------------------------------------------
[docs] @classmethod def tone( cls, freq_hz: float, *, duration: float = 1.0, volume: float = 0.3, sample_rate: int = _SAMPLE_RATE, ) -> AudioStream: """Generate a sine-wave tone at *freq_hz* with a short fade-in/out. The resulting stream has its PCM data baked into ``backend_data`` so the audio backend skips file decoding entirely. """ if freq_hz <= 0: raise ValueError(f"tone freq_hz must be > 0, got {freq_hz}") if duration <= 0: raise ValueError(f"tone duration must be > 0, got {duration}") n_frames = int(sample_rate * duration) t = np.linspace(0.0, duration, n_frames, dtype=np.float32) fade_frames = min(int(sample_rate * 0.02), n_frames // 4) envelope = np.ones(n_frames, dtype=np.float32) if fade_frames > 0: envelope[:fade_frames] = np.linspace(0.0, 1.0, fade_frames, dtype=np.float32) envelope[-fade_frames:] = np.linspace(1.0, 0.0, fade_frames, dtype=np.float32) mono = (np.sin(2 * math.pi * freq_hz * t) * volume * envelope).astype(np.float32) stereo = np.empty(n_frames * _NCHANNELS, dtype=np.float32) stereo[0::_NCHANNELS] = mono stereo[1::_NCHANNELS] = mono return cls.from_pcm(stereo, name=f"tone_{int(freq_hz)}Hz")
[docs] @classmethod def from_pcm(cls, samples: np.ndarray, *, name: str = "pcm") -> AudioStream: """Wrap a pre-rendered PCM buffer as an AudioStream. *samples* is a float32 ndarray, interleaved stereo (channels first within each frame). *name* is a descriptive label used only in :meth:`__repr__` and as the stream's ``path`` (which the backend ignores when ``backend_data`` is set). """ if not isinstance(samples, np.ndarray): raise TypeError(f"from_pcm samples must be a numpy ndarray, got {type(samples).__name__}") stream = cls.__new__(cls) stream.source = name stream.path = name stream.backend_data = samples.astype(np.float32, copy=False) return stream
# ============================================================================ # AudioListener — Tracks camera position for 3D audio # ============================================================================
[docs] class AudioListener: """Singleton that tracks the active camera for 3D audio spatialization. The listener position is automatically updated from Camera3D or Camera2D by the backend's audio system. """ _instance: AudioListener | None = None def __init__(self): self.position_3d: Vec3 = Vec3() self.position_2d: Vec2 = Vec2() self.forward: Vec3 = Vec3(0, 0, -1) # Camera forward vector self.up: Vec3 = Vec3(0, 1, 0)
[docs] @classmethod def get(cls) -> AudioListener: if cls._instance is None: cls._instance = cls() return cls._instance
[docs] @classmethod def reset(cls): """Reset singleton (useful for tests).""" cls._instance = None
# ============================================================================ # _AudioPlaybackMixin — Shared play/stop/pause/is_playing logic # ============================================================================ class _AudioPlaybackMixin: """Mixin providing common audio playback state and backend interaction. Subclasses must set self.stream, self._playing, self._paused, self._backend_channel in __init__, and define volume_db, pitch_scale, bus, autoplay, loop Settings. """ def _init_playback(self, stream: AudioSource | AudioStream | None): self.stream: AudioStream | None = None if stream is not None: self.stream = stream if isinstance(stream, AudioStream) else AudioStream(stream) self._playing: bool = False self._paused: bool = False self._backend_channel: Any = None # Suppresses on_change-driven backend pushes while ``set_pan_and_gain`` # writes both Properties so the dual-write lands as one backend call. self._suppress_audio_push: bool = False def _get_backend(self): tree = getattr(self, "_tree", None) return getattr(tree, "_audio_backend", None) if tree else None # ------------------------------------------------------------------ # Live property push — shared on_change handler # ------------------------------------------------------------------ def _push_audio_state(self, backend, channel) -> None: """Push current volume/pan/pitch through ``backend`` for ``channel``. Subclasses override this to supply spatial pan/pitch. The base non-positional player sends pan=0. """ backend.update_audio_2d(channel, self.volume_db, 0.0) def _on_volume_db_changed(self) -> None: """Push live volume change to the active backend channel. No-op while not playing or before the backend wires up; tolerates partial state because Property on_change can fire during ``__init__``. """ if getattr(self, "_suppress_audio_push", False): return backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is None or channel is None: return self._push_audio_state(backend, channel) def stop(self): """Stop playback.""" self._playing = False self._paused = False backend = self._get_backend() if backend and self._backend_channel is not None: backend.stop_audio(self._backend_channel) self._backend_channel = None def pause(self): """Pause playback. Call play() to resume.""" if not self._playing: return self._paused = True self._playing = False backend = self._get_backend() if backend and self._backend_channel is not None: backend.pause_audio(self._backend_channel) def is_playing(self) -> bool: """Check if audio is currently playing.""" return self._playing and not self._paused def is_paused(self) -> bool: """Check if audio is currently paused (call ``play()`` to resume).""" return self._paused def _resume_if_paused(self) -> bool: """If paused, resume playback and return True; otherwise return False.""" if not self._paused or self._backend_channel is None: return False self._playing = True self._paused = False backend = self._get_backend() if backend: backend.resume_audio(self._backend_channel) return True def _play_common(self, from_position: float = 0.0) -> bool: """Common play() preamble. Returns True if playback should proceed.""" if not self.stream: return False if from_position == 0.0 and self._resume_if_paused(): return False self._playing = True self._paused = False return True def _autoplay_check(self): """Start playback if autoplay is enabled. Call from ready().""" if self.autoplay and self.stream: self.play() def _exit_tree(self): super()._exit_tree() self.stop() # ============================================================================ # AudioStreamPlayer — Background music / UI sounds # ============================================================================
[docs] class AudioStreamPlayer(_AudioPlaybackMixin, Node): """Non-positional audio player for background music and UI sounds. This player does not use 3D positioning — volume is constant regardless of camera position. Use AudioStreamPlayer2D or AudioStreamPlayer3D for spatial audio. Settings: volume_db: Volume in decibels (-80 to 24). 0 = full volume. pitch_scale: Playback speed multiplier (0.5 to 2.0). bus: Audio bus name ("master", "music", "sfx", etc). autoplay: Start playing when added to scene tree. loop: Loop playback when finished. stream_mode: "memory" loads entire file; "streaming" reads in chunks. buffer_size: Chunk size in bytes for streaming mode (default 64KB). """ volume_db = Property( 0.0, range=(-80.0, 24.0), hint="Volume in decibels", group="Playback", on_change="_on_volume_db_changed", ) pitch_scale = Property(1.0, range=(0.5, 2.0), hint="Playback speed", group="Playback") bus = Property("master", enum=["master", "music", "sfx", "ui"], hint="Audio bus", group="Playback") autoplay = Property(False, hint="Auto-play on ready", group="Playback") loop = Property(False, hint="Loop playback", group="Playback") stream_mode = Property("memory", enum=["memory", "streaming"], hint="Load mode", group="Playback") buffer_size = Property(65536, range=(4096, 524288), hint="Streaming buffer size", group="Playback") def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs): super().__init__(**kwargs) self._init_playback(stream) self._stream_file: Any = None # Open file handle for streaming mode self._stream_offset: int = 0 # Current read offset in file self._stream_data_offset: int = 0 # Byte offset past file header (e.g. WAV header)
[docs] def ready(self): self._autoplay_check()
[docs] def process(self, delta: float): """Feed audio chunks to backend in streaming mode.""" if self.stream_mode == "streaming" and self._playing and not self._paused: self._process_streaming()
def _process_streaming(self): """Read next chunk from file and feed to audio backend.""" if not self._stream_file or not self._tree: return backend = self._get_backend() if not backend: return chunk = self._stream_file.read(self.buffer_size) if not chunk: if self.loop: self._stream_file.seek(self._stream_data_offset) chunk = self._stream_file.read(self.buffer_size) if not chunk: self.stop() return backend.feed_audio_chunk(self._backend_channel, chunk)
[docs] def play(self, from_position: float = 0.0): """Start or resume playback. Args: from_position: Start position in seconds (0.0 = beginning). """ if not self._play_common(from_position): return # Open file for streaming mode if self.stream_mode == "streaming" and self.stream.path: if self._stream_file: self._stream_file.close() self._stream_file = None try: self._stream_file = open(self.stream.path, "rb") # noqa: SIM115 self._stream_data_offset = 0 header = self._stream_file.read(4) if header == b"RIFF": self._stream_file.seek(44) # Standard WAV header size self._stream_data_offset = 44 else: self._stream_file.seek(0) except OSError: log.warning("Failed to open audio stream file: %s", self.stream.path) self._stream_file = None backend = self._get_backend() if backend: if self.stream_mode == "streaming": self._backend_channel = backend.open_stream( volume_db=self.volume_db, pitch=self.pitch_scale, bus=self.bus, ) else: self._backend_channel = backend.play_audio( self.stream, volume_db=self.volume_db, pitch=self.pitch_scale, loop=self.loop, bus=self.bus, )
[docs] def stop(self): """Stop playback and reset position to beginning.""" if self._stream_file: self._stream_file.close() self._stream_file = None super().stop()
[docs] def get_playback_position(self) -> float: """Get current playback position in seconds.""" backend = self._get_backend() if backend and self._backend_channel is not None: return backend.get_playback_position(self._backend_channel) return 0.0
# ============================================================================ # AudioStreamPlayer2D — 2D positional audio # ============================================================================
[docs] class AudioStreamPlayer2D(_AudioPlaybackMixin, Node2D): """2D positional audio player with stereo panning. Audio volume and pan are calculated based on distance from the 2D listener (typically Camera2D position). Left/right panning simulates direction. Settings: volume_db: Base volume in decibels (-80 to 24). pitch_scale: Playback speed multiplier (0.5 to 2.0). bus: Audio bus name. autoplay: Start playing when added to scene tree. loop: Loop playback when finished. max_distance: Distance at which audio is inaudible (pixels). attenuation: Distance attenuation exponent (1.0 = linear, 2.0 = inverse square). """ volume_db = Property( 0.0, range=(-80.0, 24.0), hint="Volume in decibels", group="Playback", on_change="_on_volume_db_changed", ) pitch_scale = Property(1.0, range=(0.5, 2.0), hint="Playback speed", group="Playback") bus = Property("sfx", enum=["master", "music", "sfx", "ui"], hint="Audio bus", group="Playback") autoplay = Property(False, hint="Auto-play on ready", group="Playback") loop = Property(False, hint="Loop playback", group="Playback") max_distance = Property(2000.0, range=(1.0, 10000.0), hint="Max hearing distance (pixels)", group="Spatial") attenuation = Property(1.0, range=(0.1, 4.0), hint="Distance attenuation exponent", group="Spatial") pan_override = Property( None, hint="Override positional pan; None = use world position", group="Playback", on_change="_on_pan_override_changed", ) gizmo_colour = Colour((0.6, 0.4, 1.0, 0.5))
[docs] def get_gizmo_lines(self) -> list[tuple[Vec2, Vec2]]: """Return circle showing the audio range.""" from .physics_nodes import _circle_lines_2d p = self.world_position return _circle_lines_2d(p.x, p.y, float(self.max_distance), 32)
def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs): super().__init__(**kwargs) self._init_playback(stream)
[docs] def ready(self): self._autoplay_check()
# ------------------------------------------------------------------ # Spatial helpers # ------------------------------------------------------------------ def _attenuated_volume_db(self, distance: float) -> float: """Return distance-attenuated volume_db for a given listener distance.""" if distance > self.max_distance: return -80.0 # volume_db = base - 80 * (dist / max_dist) ^ attenuation dist_ratio = distance / self.max_distance return self.volume_db - 80.0 * (dist_ratio**self.attenuation) def _compute_positional_pan(self) -> float: """Compute the positional stereo pan (-1=left, 0=center, 1=right).""" listener = AudioListener.get() dx = self.world_position.x - listener.position_2d.x return clamp(dx / self.max_distance, -1.0, 1.0) def _push_audio_state(self, backend, channel) -> None: """Send distance-attenuated volume + (override or positional) pan.""" listener = AudioListener.get() distance = (self.world_position - listener.position_2d).length() volume = self._attenuated_volume_db(distance) pan = self.pan_override if self.pan_override is not None else self._compute_positional_pan() backend.update_audio_2d(channel, volume, pan) def _on_pan_override_changed(self) -> None: """Push the new pan to the active backend channel.""" if getattr(self, "_suppress_audio_push", False): return backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is None or channel is None: return self._push_audio_state(backend, channel)
[docs] def set_pan_and_gain(self, pan: float, gain_db: float) -> None: """Atomically set ``pan_override`` + ``volume_db`` with one backend call. Use when both must land on the same audio frame (e.g. fade-out before retrigger). Setting them as separate Property assignments would fire two backend updates. """ self._suppress_audio_push = True try: self.pan_override = pan self.volume_db = gain_db finally: self._suppress_audio_push = False backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is not None and channel is not None: self._push_audio_state(backend, channel)
[docs] def process(self, delta: float): """Update 2D spatialization each frame.""" if not self._playing or self._paused: return backend = self._get_backend() if backend and self._backend_channel is not None: self._push_audio_state(backend, self._backend_channel)
[docs] def play(self, from_position: float = 0.0): """Start or resume playback.""" if not self._play_common(from_position): return backend = self._get_backend() if backend: self._backend_channel = backend.play_audio_2d( self.stream, position=self.world_position, volume_db=self.volume_db, pitch=self.pitch_scale, loop=self.loop, bus=self.bus, max_distance=self.max_distance, )
# ============================================================================ # AudioStreamPlayer3D — 3D spatial audio # ============================================================================
[docs] class AudioStreamPlayer3D(_AudioPlaybackMixin, Node3D): """3D spatial audio player with distance attenuation and directional panning. Audio volume and stereo panning are calculated based on distance and direction from the 3D listener (typically Camera3D position/orientation). Settings: volume_db: Base volume in decibels (-80 to 24). pitch_scale: Playback speed multiplier (0.5 to 2.0). bus: Audio bus name. autoplay: Start playing when added to scene tree. loop: Loop playback when finished. max_distance: Distance at which audio is inaudible (world units). attenuation: Distance attenuation exponent (1.0 = linear, 2.0 = inverse square). doppler_scale: Doppler effect strength (0.0 = off, 1.0 = realistic). """ volume_db = Property( 0.0, range=(-80.0, 24.0), hint="Volume in decibels", group="Playback", on_change="_on_volume_db_changed", ) pitch_scale = Property(1.0, range=(0.5, 2.0), hint="Playback speed", group="Playback") bus = Property("sfx", enum=["master", "music", "sfx", "ui"], hint="Audio bus", group="Playback") autoplay = Property(False, hint="Auto-play on ready", group="Playback") loop = Property(False, hint="Loop playback", group="Playback") max_distance = Property(100.0, range=(1.0, 1000.0), hint="Max hearing distance", group="Spatial") attenuation = Property(1.0, range=(0.1, 4.0), hint="Distance attenuation exponent", group="Spatial") doppler_scale = Property(0.0, range=(0.0, 4.0), hint="Doppler effect strength", group="Spatial") pan_override = Property( None, hint="Override directional pan; None = use world position", group="Playback", on_change="_on_pan_override_changed", ) gizmo_colour = Colour((0.6, 0.4, 1.0, 0.5))
[docs] def get_gizmo_lines(self) -> list[tuple[Vec3, Vec3]]: """Return 3 circles showing the audio range sphere.""" from .physics_nodes import _circle_lines_3d p = self.world_position r = float(self.max_distance) lines: list[tuple[Vec3, Vec3]] = [] lines.extend(_circle_lines_3d(p, Vec3(1, 0, 0), Vec3(0, 1, 0), r)) lines.extend(_circle_lines_3d(p, Vec3(1, 0, 0), Vec3(0, 0, 1), r)) lines.extend(_circle_lines_3d(p, Vec3(0, 1, 0), Vec3(0, 0, 1), r)) return lines
def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs): super().__init__(**kwargs) self._init_playback(stream) self._prev_position: Vec3 = Vec3() # For Doppler
[docs] def ready(self): self._prev_position = self.world_position self._autoplay_check()
# ------------------------------------------------------------------ # Spatial helpers # ------------------------------------------------------------------ def _compute_3d_state(self, delta: float) -> tuple[float, float, float]: """Return (volume_db, pan, pitch) for the current frame. ``delta`` drives Doppler. Pass 0.0 from on_change handlers (no Doppler kick on a Property nudge — Doppler reapplies next frame). ``_prev_position`` is left untouched here; ``process()`` is the sole owner of that state to keep Doppler velocity stable. """ listener = AudioListener.get() to_source = self.world_position - listener.position_3d distance = to_source.length() # Distance attenuation if distance > self.max_distance: volume = -80.0 else: d = max(distance, 0.1) # avoid divide-by-zero at the listener dist_ratio = d / self.max_distance volume = self.volume_db - 80.0 * (dist_ratio**self.attenuation) # Pan: positional unless caller has overridden it if self.pan_override is not None: pan = float(self.pan_override) elif distance > 0.01: to_source_norm = to_source / distance right = listener.forward.cross(listener.up) pan = clamp(to_source_norm.dot(right), -1.0, 1.0) else: pan = 0.0 # Doppler — pitch shift from radial velocity pitch = self.pitch_scale if self.doppler_scale > 0.0 and delta > 0.0: velocity = (self.world_position - self._prev_position) / delta velocity_towards = velocity.dot(-to_source) / (distance if distance > 0.1 else 0.1) speed_of_sound = 343.0 doppler_factor = 1.0 + (velocity_towards / speed_of_sound) * self.doppler_scale pitch = self.pitch_scale * clamp(doppler_factor, 0.5, 2.0) return volume, pan, pitch def _push_audio_state(self, backend, channel) -> None: """Send current 3D volume/pan/pitch (without advancing Doppler velocity).""" volume, pan, pitch = self._compute_3d_state(0.0) backend.update_audio_3d(channel, volume, pan, pitch) def _on_pan_override_changed(self) -> None: if getattr(self, "_suppress_audio_push", False): return backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is None or channel is None: return self._push_audio_state(backend, channel)
[docs] def set_pan_and_gain(self, pan: float, gain_db: float) -> None: """Atomically set ``pan_override`` + ``volume_db`` with one backend call.""" self._suppress_audio_push = True try: self.pan_override = pan self.volume_db = gain_db finally: self._suppress_audio_push = False backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is not None and channel is not None: self._push_audio_state(backend, channel)
[docs] def process(self, delta: float): """Update 3D spatialization each frame.""" if not self._playing or self._paused: return volume, pan, pitch = self._compute_3d_state(delta) self._prev_position = Vec3(self.world_position) backend = self._get_backend() if backend and self._backend_channel is not None: backend.update_audio_3d(self._backend_channel, volume, pan, pitch)
[docs] def play(self, from_position: float = 0.0): """Start or resume playback.""" if not self._play_common(from_position): return self._prev_position = self.world_position backend = self._get_backend() if backend: self._backend_channel = backend.play_audio_3d( self.stream, position=self.world_position, volume_db=self.volume_db, pitch=self.pitch_scale, loop=self.loop, bus=self.bus, max_distance=self.max_distance, )