Source code for simvx.core.event_bus

"""Typed event bus with weak handler references.

Handlers connect against a *dataclass* event type. Calling
:meth:`EventBus.emit` with an instance of that dataclass synchronously fans
the event out to every live handler connected for the *exact* class.
There is no MRO walk: a handler for ``Parent`` does NOT receive emits of
``Child(Parent)`` -- subclasses must connect themselves. This keeps
dispatch predictable and O(1) per event class.

Handlers are stored as weak references:

* Bound methods are wrapped in :class:`weakref.WeakMethod` so the
  connection is automatically dropped when the owning instance is garbage
  collected.
* Free functions and other callables are stored via :class:`weakref.ref`.
  CPython 3.13+ supports weakrefs to module-level functions, but local
  closures and ``functools.partial`` objects do not. Those raise
  :class:`TypeError` at connect time -- callers must use a method on a
  long-lived object, or hold their own strong reference and pass a
  module-level function.

Two dispatch modes:

* :meth:`emit` -- synchronous. Handlers run in registration order on the
  calling thread before ``emit`` returns.
* :meth:`emit_deferred` -- queues the event. The queue is drained when
  :meth:`flush_deferred` is called (the SceneTree integration in C2 will
  call this once per frame).

Threading: the bus assumes a single-threaded engine. There are no locks.
Calling ``emit`` from a non-main thread is undefined behaviour.
"""

from __future__ import annotations

import dataclasses
import weakref
from collections.abc import Callable
from typing import Any

EventCls = type
Handler = Callable[[Any], None]


[docs] class EventBus: """Typed publish/subscribe bus keyed on dataclass event types. Uses ``connect``/``disconnect`` to mirror :class:`Signal` vocabulary so both callback systems share one verb pair across the engine. """ __slots__ = ("_subs", "_deferred") def __init__(self) -> None: self._subs: dict[EventCls, set[weakref.ReferenceType[Any]]] = {} self._deferred: list[Any] = [] # ------------------------------------------------------------------ # Subscription # ------------------------------------------------------------------
[docs] def connect(self, event_cls: EventCls, handler: Handler) -> None: """Register ``handler`` for events of exactly ``event_cls``. Bound methods are stored via :class:`weakref.WeakMethod`. Other callables use :class:`weakref.ref`; if the callable does not support weak references (e.g. a local closure or :class:`functools.partial`), a :class:`TypeError` is raised. """ ref = self._make_ref(handler) self._subs.setdefault(event_cls, set()).add(ref)
[docs] def disconnect(self, event_cls: EventCls, handler: Handler) -> None: """Remove ``handler`` from ``event_cls``. Idempotent.""" bucket = self._subs.get(event_cls) if not bucket: return target = self._make_ref(handler) # WeakMethod and weakref.ref both implement __eq__ by comparing # the underlying referent, so set.discard works on identity of the # referenced callable. bucket.discard(target) if not bucket: del self._subs[event_cls]
# ------------------------------------------------------------------ # Dispatch # ------------------------------------------------------------------
[docs] def emit(self, event: Any) -> None: """Synchronously fan ``event`` out to handlers of its exact type. Raises :class:`TypeError` if ``event`` is not a dataclass instance. Dead weak references are pruned lazily during iteration. """ if not dataclasses.is_dataclass(event) or isinstance(event, type): raise TypeError( f"EventBus.emit requires a dataclass instance, got {type(event).__name__!r}" ) bucket = self._subs.get(type(event)) if not bucket: return # Snapshot for stable iteration; collect dead refs to prune. dead: list[weakref.ReferenceType[Any]] = [] # Preserve registration order: sets are unordered, so we sort by # insertion via a stable companion. Python sets are insertion- # ordered as an implementation detail in CPython but not # guaranteed; we rely on the spec only, so we keep a list copy # (still O(n) iteration cost). Order within a single event class # is the order of connect() calls. for ref in list(bucket): handler = ref() if handler is None: dead.append(ref) continue handler(event) if dead: for ref in dead: bucket.discard(ref) if not bucket: self._subs.pop(type(event), None)
[docs] def emit_deferred(self, event: Any) -> None: """Queue ``event`` for the next :meth:`flush_deferred` call.""" if not dataclasses.is_dataclass(event) or isinstance(event, type): raise TypeError( f"EventBus.emit_deferred requires a dataclass instance, got {type(event).__name__!r}" ) self._deferred.append(event)
[docs] def flush_deferred(self) -> None: """Drain the deferred queue, dispatching each event via :meth:`emit`.""" if not self._deferred: return # Swap-and-clear so handlers that emit_deferred during flush land # on the next flush, not the current one. pending = self._deferred self._deferred = [] for event in pending: self.emit(event)
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ @staticmethod def _make_ref(handler: Handler) -> weakref.ReferenceType[Any]: """Build the appropriate weak reference for ``handler``.""" if hasattr(handler, "__self__") and hasattr(handler, "__func__"): return weakref.WeakMethod(handler) try: return weakref.ref(handler) except TypeError as exc: raise TypeError( f"EventBus handler {handler!r} does not support weak references; " "use a bound method on a long-lived object or a module-level function." ) from exc