"""Frame profiler with ring-buffer timing and per-node hotspot tracking."""
import logging
import os
import time
from collections import deque
log = logging.getLogger(__name__)
__all__ = ["FrameProfiler"]
_RING_SIZE = 600 # ~10s at 60fps
_GPU_HISTORY_SIZE = 120 # ~2s at 60fps for sparklines
try:
_PAGE_SIZE: int = os.sysconf("SC_PAGE_SIZE") # type: ignore[attr-defined]
except (AttributeError, ValueError, OSError):
_PAGE_SIZE = 4096
# Exponential moving average smoothing factor for per-node hotspot averages.
# 0.9 keeps 90% of previous avg; new sample contributes 10% — stable for a
# live display while still responsive to genuine hotspot shifts.
_HOTSPOT_EMA_ALPHA = 0.1
[docs]
class FrameProfiler:
"""Ring-buffer frame timer + per-node hotspot sampler.
Three layers of data:
* **Phases** — named sections (``process``, ``physics``, ``draw``...).
``begin(phase)`` / ``end(phase)`` time code blocks; ``end_frame()`` commits
the per-phase timings into the ring buffer.
* **Counts** — ``count_nodes(tree)`` walks the tree once per frame to
populate ``node_count`` / ``control_count``. Renderer stats (mesh count,
draw calls) can be stored directly.
* **Hotspots** — ``sample_node(path, ms)`` accumulates per-node process /
physics time. ``hotspots(n)`` returns the top-N by EMA-smoothed time.
``enabled=False`` is a cheap guard; ``begin``/``end``/``sample_node`` return
immediately so the instrumentation has near-zero cost when disabled.
"""
def __init__(self):
self.enabled: bool = True
self._ring: list[dict[str, float]] = [{} for _ in range(_RING_SIZE)]
self._index: int = 0
self._samples: int = 0 # total frames recorded (saturates at _RING_SIZE)
self._current: dict[str, float] = {}
self._starts: dict[str, float] = {}
self._node_count: int = 0
self._control_count: int = 0
self._mesh_count: int = 0
self._draw_call_count: int = 0
self._last_frame_time: float = 0.0
self._last_frame_ts: float = time.perf_counter()
# Per-node timings for the currently-running frame:
# path -> [process_ms, physics_ms, call_count]
self._node_current: dict[str, list[float]] = {}
# Exponential moving average of per-node total (process+physics) time:
# path -> ema_ms
self._node_ema: dict[str, float] = {}
# Per-pass GPU timing ring buffers populated from the renderer's
# timestamp queries (lag of FRAMES_IN_FLIGHT frames). One deque per
# label, oldest sample at index 0.
self._gpu_phase_history: dict[str, deque[float]] = {}
self._gpu_phase_latest: dict[str, float] = {}
# ------------------------------------------------------------------
# Phase timing (begin / end / end_frame)
# ------------------------------------------------------------------
[docs]
def begin(self, phase: str):
if not self.enabled:
return
self._starts[phase] = time.perf_counter()
[docs]
def end(self, phase: str):
if not self.enabled:
return
start = self._starts.pop(phase, None)
if start is not None:
self._current[phase] = (time.perf_counter() - start) * 1000.0
[docs]
def end_frame(self):
if not self.enabled:
return
now = time.perf_counter()
self._last_frame_time = (now - self._last_frame_ts) * 1000.0
self._last_frame_ts = now
# Auto-populate a "total" phase if the caller didn't time it explicitly.
if "total" not in self._current:
self._current["total"] = self._last_frame_time
self._ring[self._index] = dict(self._current)
self._index = (self._index + 1) % _RING_SIZE
self._samples = min(self._samples + 1, _RING_SIZE)
self._current.clear()
self._starts.clear()
self._commit_node_ema()
# ------------------------------------------------------------------
# Scene counts
# ------------------------------------------------------------------
[docs]
def count_nodes(self, tree):
"""Traverse tree and count nodes/controls. Call once per frame."""
if not self.enabled or tree is None or tree.root is None:
self._node_count = 0
self._control_count = 0
return
from ..ui import Control
nodes = 0
controls = 0
stack = [tree.root]
while stack:
node = stack.pop()
nodes += 1
if isinstance(node, Control):
controls += 1
stack.extend(node.children)
self._node_count = nodes
self._control_count = controls
[docs]
def set_renderer_counts(self, *, meshes: int = 0, draw_calls: int = 0) -> None:
"""Record renderer-side counters (set by the graphics backend)."""
self._mesh_count = meshes
self._draw_call_count = draw_calls
# ------------------------------------------------------------------
# Per-node hotspot tracking
# ------------------------------------------------------------------
[docs]
def sample_node(self, path: str, phase: str, ms: float) -> None:
"""Record that ``path`` spent ``ms`` milliseconds in ``phase``.
``phase`` is one of ``"process"`` or ``"physics"``. Samples accumulate
across the frame; ``end_frame()`` folds them into the EMA.
"""
if not self.enabled:
return
entry = self._node_current.get(path)
if entry is None:
entry = [0.0, 0.0, 0]
self._node_current[path] = entry
if phase == "process":
entry[0] += ms
elif phase == "physics":
entry[1] += ms
entry[2] += 1
def _commit_node_ema(self) -> None:
"""Fold the frame's per-node samples into the long-running EMA."""
alpha = _HOTSPOT_EMA_ALPHA
one_minus = 1.0 - alpha
ema = self._node_ema
# Seen nodes get their total time blended in; unseen nodes decay.
seen = set(self._node_current)
for path, entry in self._node_current.items():
total = entry[0] + entry[1]
ema[path] = ema.get(path, 0.0) * one_minus + total * alpha
# Decay entries we didn't see this frame; drop them once near zero.
for path in list(ema):
if path in seen:
continue
ema[path] *= one_minus
if ema[path] < 0.005: # sub-5µs — forget it
del ema[path]
self._node_current.clear()
[docs]
def hotspots(self, n: int = 10) -> list[tuple[str, float]]:
"""Return the top-N node paths by smoothed per-frame time (ms)."""
if not self._node_ema:
return []
items = sorted(self._node_ema.items(), key=lambda kv: kv[1], reverse=True)
return items[:n]
# ------------------------------------------------------------------
# GPU phase timing (driven by the renderer's TimestampPool)
# ------------------------------------------------------------------
[docs]
def record_gpu_phase(self, name: str, ms: float) -> None:
"""Append a per-pass GPU timing sample (in milliseconds).
Producers (the editor's per-frame tick) copy ``engine.gpu_phase_times``
into the profiler one entry at a time. Each label keeps its own
ring buffer so passes that are conditionally enabled don't pollute
the history of unrelated passes.
"""
if not self.enabled:
return
history = self._gpu_phase_history.get(name)
if history is None:
history = deque(maxlen=_GPU_HISTORY_SIZE)
self._gpu_phase_history[name] = history
history.append(ms)
self._gpu_phase_latest[name] = ms
[docs]
def gpu_phase_history_for(self, name: str, count: int = _GPU_HISTORY_SIZE) -> list[float]:
"""Recent GPU samples for ``name`` (oldest first)."""
history = self._gpu_phase_history.get(name)
if not history:
return []
if count >= len(history):
return list(history)
return list(history)[-count:]
[docs]
def gpu_phase_latest(self) -> dict[str, float]:
"""Latest published GPU sample per label (label -> ms)."""
return dict(self._gpu_phase_latest)
[docs]
def clear_gpu_phases(self) -> None:
"""Drop all GPU history. Used by callers when the renderer goes away."""
self._gpu_phase_history.clear()
self._gpu_phase_latest.clear()
# ------------------------------------------------------------------
# Memory
# ------------------------------------------------------------------
[docs]
def memory_mb(self) -> float:
"""Return the current process RSS in megabytes, or 0.0 if unavailable.
Uses ``/proc/self/statm`` on Linux, ``resource.getrusage`` elsewhere.
Returns 0.0 on Windows (no cheap built-in available).
"""
# Linux fast path
try:
with open("/proc/self/statm") as f:
rss_pages = int(f.read().split()[1])
return rss_pages * _PAGE_SIZE / (1024.0 * 1024.0)
except Exception:
# justified: /proc/self/statm parse failure on non-Linux; falls through to POSIX rusage path
pass
# POSIX fallback
try:
import resource # noqa: PLC0415
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# Linux reports KB, macOS reports bytes -- heuristic: values >1e8 are bytes
return rss / (1024.0 * 1024.0) if rss > 1e8 else rss / 1024.0
except Exception:
return 0.0
# ------------------------------------------------------------------
# Read-only properties and queries
# ------------------------------------------------------------------
[docs]
@property
def fps(self) -> float:
return 1000.0 / self._last_frame_time if self._last_frame_time > 0 else 0.0
[docs]
@property
def frame_time_ms(self) -> float:
return self._last_frame_time
[docs]
@property
def node_count(self) -> int:
return self._node_count
[docs]
@property
def control_count(self) -> int:
return self._control_count
[docs]
@property
def mesh_count(self) -> int:
return self._mesh_count
[docs]
@property
def draw_call_count(self) -> int:
return self._draw_call_count
[docs]
@property
def last_frame(self) -> dict[str, float]:
"""Phase timings from the most recent completed frame."""
idx = (self._index - 1) % _RING_SIZE
return self._ring[idx]
[docs]
@property
def sample_count(self) -> int:
"""Total frames recorded (saturates at ring capacity)."""
return self._samples
[docs]
def phase_avg_ms(self, phase: str, count: int = 60) -> float:
"""Average time for a phase over the last ``count`` frames."""
count = min(count, self._samples)
if count <= 0:
return 0.0
total = 0.0
n = 0
for i in range(count):
idx = (self._index - 1 - i) % _RING_SIZE
val = self._ring[idx].get(phase)
if val is not None:
total += val
n += 1
return total / n if n > 0 else 0.0
[docs]
def phase_max_ms(self, phase: str, count: int = 60) -> float:
"""Maximum time for a phase over the last ``count`` frames."""
count = min(count, self._samples)
if count <= 0:
return 0.0
best = 0.0
for i in range(count):
idx = (self._index - 1 - i) % _RING_SIZE
val = self._ring[idx].get(phase)
if val is not None and val > best:
best = val
return best
[docs]
def frame_times(self, count: int = 120) -> list[float]:
"""Recent ``total`` phase times for graph rendering (oldest first)."""
count = min(count, self._samples)
result = []
for i in range(count):
idx = (self._index - 1 - i) % _RING_SIZE
t = self._ring[idx].get("total", 0.0)
result.append(t)
result.reverse()
return result
[docs]
def phase_history(self, phase: str, count: int = 120) -> list[float]:
"""Per-frame time for ``phase`` over recent frames (oldest first)."""
count = min(count, self._samples)
result = []
for i in range(count):
idx = (self._index - 1 - i) % _RING_SIZE
result.append(self._ring[idx].get(phase, 0.0))
result.reverse()
return result
[docs]
def reset(self) -> None:
"""Discard all recorded samples and hotspot averages."""
for entry in self._ring:
entry.clear()
self._index = 0
self._samples = 0
self._current.clear()
self._starts.clear()
self._node_current.clear()
self._node_ema.clear()
self._gpu_phase_history.clear()
self._gpu_phase_latest.clear()
self._last_frame_time = 0.0
self._last_frame_ts = time.perf_counter()