Source code for simvx.core.debug.profiler

"""Frame profiler with ring-buffer timing and per-node hotspot tracking."""

import logging
import os
import time
from collections import deque

log = logging.getLogger(__name__)

__all__ = ["FrameProfiler"]

_RING_SIZE = 600  # ~10s at 60fps
_GPU_HISTORY_SIZE = 120  # ~2s at 60fps for sparklines

try:
    _PAGE_SIZE: int = os.sysconf("SC_PAGE_SIZE")  # type: ignore[attr-defined]
except (AttributeError, ValueError, OSError):
    _PAGE_SIZE = 4096

# Exponential moving average smoothing factor for per-node hotspot averages.
# 0.9 keeps 90% of previous avg; new sample contributes 10% — stable for a
# live display while still responsive to genuine hotspot shifts.
_HOTSPOT_EMA_ALPHA = 0.1

[docs] class FrameProfiler: """Ring-buffer frame timer + per-node hotspot sampler. Three layers of data: * **Phases** — named sections (``process``, ``physics``, ``draw``...). ``begin(phase)`` / ``end(phase)`` time code blocks; ``end_frame()`` commits the per-phase timings into the ring buffer. * **Counts** — ``count_nodes(tree)`` walks the tree once per frame to populate ``node_count`` / ``control_count``. Renderer stats (mesh count, draw calls) can be stored directly. * **Hotspots** — ``sample_node(path, ms)`` accumulates per-node process / physics time. ``hotspots(n)`` returns the top-N by EMA-smoothed time. ``enabled=False`` is a cheap guard; ``begin``/``end``/``sample_node`` return immediately so the instrumentation has near-zero cost when disabled. """ def __init__(self): self.enabled: bool = True self._ring: list[dict[str, float]] = [{} for _ in range(_RING_SIZE)] self._index: int = 0 self._samples: int = 0 # total frames recorded (saturates at _RING_SIZE) self._current: dict[str, float] = {} self._starts: dict[str, float] = {} self._node_count: int = 0 self._control_count: int = 0 self._mesh_count: int = 0 self._draw_call_count: int = 0 self._last_frame_time: float = 0.0 self._last_frame_ts: float = time.perf_counter() # Per-node timings for the currently-running frame: # path -> [process_ms, physics_ms, call_count] self._node_current: dict[str, list[float]] = {} # Exponential moving average of per-node total (process+physics) time: # path -> ema_ms self._node_ema: dict[str, float] = {} # Per-pass GPU timing ring buffers populated from the renderer's # timestamp queries (lag of FRAMES_IN_FLIGHT frames). One deque per # label, oldest sample at index 0. self._gpu_phase_history: dict[str, deque[float]] = {} self._gpu_phase_latest: dict[str, float] = {} # ------------------------------------------------------------------ # Phase timing (begin / end / end_frame) # ------------------------------------------------------------------
[docs] def begin(self, phase: str): if not self.enabled: return self._starts[phase] = time.perf_counter()
[docs] def end(self, phase: str): if not self.enabled: return start = self._starts.pop(phase, None) if start is not None: self._current[phase] = (time.perf_counter() - start) * 1000.0
[docs] def end_frame(self): if not self.enabled: return now = time.perf_counter() self._last_frame_time = (now - self._last_frame_ts) * 1000.0 self._last_frame_ts = now # Auto-populate a "total" phase if the caller didn't time it explicitly. if "total" not in self._current: self._current["total"] = self._last_frame_time self._ring[self._index] = dict(self._current) self._index = (self._index + 1) % _RING_SIZE self._samples = min(self._samples + 1, _RING_SIZE) self._current.clear() self._starts.clear() self._commit_node_ema()
# ------------------------------------------------------------------ # Scene counts # ------------------------------------------------------------------
[docs] def count_nodes(self, tree): """Traverse tree and count nodes/controls. Call once per frame.""" if not self.enabled or tree is None or tree.root is None: self._node_count = 0 self._control_count = 0 return from ..ui import Control nodes = 0 controls = 0 stack = [tree.root] while stack: node = stack.pop() nodes += 1 if isinstance(node, Control): controls += 1 stack.extend(node.children) self._node_count = nodes self._control_count = controls
[docs] def set_renderer_counts(self, *, meshes: int = 0, draw_calls: int = 0) -> None: """Record renderer-side counters (set by the graphics backend).""" self._mesh_count = meshes self._draw_call_count = draw_calls
# ------------------------------------------------------------------ # Per-node hotspot tracking # ------------------------------------------------------------------
[docs] def sample_node(self, path: str, phase: str, ms: float) -> None: """Record that ``path`` spent ``ms`` milliseconds in ``phase``. ``phase`` is one of ``"process"`` or ``"physics"``. Samples accumulate across the frame; ``end_frame()`` folds them into the EMA. """ if not self.enabled: return entry = self._node_current.get(path) if entry is None: entry = [0.0, 0.0, 0] self._node_current[path] = entry if phase == "process": entry[0] += ms elif phase == "physics": entry[1] += ms entry[2] += 1
def _commit_node_ema(self) -> None: """Fold the frame's per-node samples into the long-running EMA.""" alpha = _HOTSPOT_EMA_ALPHA one_minus = 1.0 - alpha ema = self._node_ema # Seen nodes get their total time blended in; unseen nodes decay. seen = set(self._node_current) for path, entry in self._node_current.items(): total = entry[0] + entry[1] ema[path] = ema.get(path, 0.0) * one_minus + total * alpha # Decay entries we didn't see this frame; drop them once near zero. for path in list(ema): if path in seen: continue ema[path] *= one_minus if ema[path] < 0.005: # sub-5µs — forget it del ema[path] self._node_current.clear()
[docs] def hotspots(self, n: int = 10) -> list[tuple[str, float]]: """Return the top-N node paths by smoothed per-frame time (ms).""" if not self._node_ema: return [] items = sorted(self._node_ema.items(), key=lambda kv: kv[1], reverse=True) return items[:n]
# ------------------------------------------------------------------ # GPU phase timing (driven by the renderer's TimestampPool) # ------------------------------------------------------------------
[docs] def record_gpu_phase(self, name: str, ms: float) -> None: """Append a per-pass GPU timing sample (in milliseconds). Producers (the editor's per-frame tick) copy ``engine.gpu_phase_times`` into the profiler one entry at a time. Each label keeps its own ring buffer so passes that are conditionally enabled don't pollute the history of unrelated passes. """ if not self.enabled: return history = self._gpu_phase_history.get(name) if history is None: history = deque(maxlen=_GPU_HISTORY_SIZE) self._gpu_phase_history[name] = history history.append(ms) self._gpu_phase_latest[name] = ms
[docs] def gpu_phase_history_for(self, name: str, count: int = _GPU_HISTORY_SIZE) -> list[float]: """Recent GPU samples for ``name`` (oldest first).""" history = self._gpu_phase_history.get(name) if not history: return [] if count >= len(history): return list(history) return list(history)[-count:]
[docs] def gpu_phase_latest(self) -> dict[str, float]: """Latest published GPU sample per label (label -> ms).""" return dict(self._gpu_phase_latest)
[docs] def clear_gpu_phases(self) -> None: """Drop all GPU history. Used by callers when the renderer goes away.""" self._gpu_phase_history.clear() self._gpu_phase_latest.clear()
# ------------------------------------------------------------------ # Memory # ------------------------------------------------------------------
[docs] def memory_mb(self) -> float: """Return the current process RSS in megabytes, or 0.0 if unavailable. Uses ``/proc/self/statm`` on Linux, ``resource.getrusage`` elsewhere. Returns 0.0 on Windows (no cheap built-in available). """ # Linux fast path try: with open("/proc/self/statm") as f: rss_pages = int(f.read().split()[1]) return rss_pages * _PAGE_SIZE / (1024.0 * 1024.0) except Exception: # justified: /proc/self/statm parse failure on non-Linux; falls through to POSIX rusage path pass # POSIX fallback try: import resource # noqa: PLC0415 rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss # Linux reports KB, macOS reports bytes -- heuristic: values >1e8 are bytes return rss / (1024.0 * 1024.0) if rss > 1e8 else rss / 1024.0 except Exception: return 0.0
# ------------------------------------------------------------------ # Read-only properties and queries # ------------------------------------------------------------------
[docs] @property def fps(self) -> float: return 1000.0 / self._last_frame_time if self._last_frame_time > 0 else 0.0
[docs] @property def frame_time_ms(self) -> float: return self._last_frame_time
[docs] @property def node_count(self) -> int: return self._node_count
[docs] @property def control_count(self) -> int: return self._control_count
[docs] @property def mesh_count(self) -> int: return self._mesh_count
[docs] @property def draw_call_count(self) -> int: return self._draw_call_count
[docs] @property def last_frame(self) -> dict[str, float]: """Phase timings from the most recent completed frame.""" idx = (self._index - 1) % _RING_SIZE return self._ring[idx]
[docs] @property def sample_count(self) -> int: """Total frames recorded (saturates at ring capacity).""" return self._samples
[docs] def phase_avg_ms(self, phase: str, count: int = 60) -> float: """Average time for a phase over the last ``count`` frames.""" count = min(count, self._samples) if count <= 0: return 0.0 total = 0.0 n = 0 for i in range(count): idx = (self._index - 1 - i) % _RING_SIZE val = self._ring[idx].get(phase) if val is not None: total += val n += 1 return total / n if n > 0 else 0.0
[docs] def phase_max_ms(self, phase: str, count: int = 60) -> float: """Maximum time for a phase over the last ``count`` frames.""" count = min(count, self._samples) if count <= 0: return 0.0 best = 0.0 for i in range(count): idx = (self._index - 1 - i) % _RING_SIZE val = self._ring[idx].get(phase) if val is not None and val > best: best = val return best
[docs] def frame_times(self, count: int = 120) -> list[float]: """Recent ``total`` phase times for graph rendering (oldest first).""" count = min(count, self._samples) result = [] for i in range(count): idx = (self._index - 1 - i) % _RING_SIZE t = self._ring[idx].get("total", 0.0) result.append(t) result.reverse() return result
[docs] def phase_history(self, phase: str, count: int = 120) -> list[float]: """Per-frame time for ``phase`` over recent frames (oldest first).""" count = min(count, self._samples) result = [] for i in range(count): idx = (self._index - 1 - i) % _RING_SIZE result.append(self._ring[idx].get(phase, 0.0)) result.reverse() return result
[docs] def reset(self) -> None: """Discard all recorded samples and hotspot averages.""" for entry in self._ring: entry.clear() self._index = 0 self._samples = 0 self._current.clear() self._starts.clear() self._node_current.clear() self._node_ema.clear() self._gpu_phase_history.clear() self._gpu_phase_latest.clear() self._last_frame_time = 0.0 self._last_frame_ts = time.perf_counter()