Source code for simvx.core.event_bus
"""Typed event bus with weak handler references.
Handlers connect against a *dataclass* event type. Calling
:meth:`EventBus.emit` with an instance of that dataclass synchronously fans
the event out to every live handler connected for the *exact* class.
There is no MRO walk: a handler for ``Parent`` does NOT receive emits of
``Child(Parent)`` -- subclasses must connect themselves. This keeps
dispatch predictable and O(1) per event class.
Handlers are stored as weak references:
* Bound methods are wrapped in :class:`weakref.WeakMethod` so the
connection is automatically dropped when the owning instance is garbage
collected.
* Free functions and other callables are stored via :class:`weakref.ref`.
CPython 3.13+ supports weakrefs to module-level functions, but local
closures and ``functools.partial`` objects do not. Those raise
:class:`TypeError` at connect time -- callers must use a method on a
long-lived object, or hold their own strong reference and pass a
module-level function.
Two dispatch modes:
* :meth:`emit` -- synchronous. Handlers run in registration order on the
calling thread before ``emit`` returns.
* :meth:`emit_deferred` -- queues the event. The queue is drained when
:meth:`flush_deferred` is called (the SceneTree integration in C2 will
call this once per frame).
Threading: the bus assumes a single-threaded engine. There are no locks.
Calling ``emit`` from a non-main thread is undefined behaviour.
"""
from __future__ import annotations
import dataclasses
import weakref
from collections.abc import Callable
from typing import Any
EventCls = type
Handler = Callable[[Any], None]
[docs]
class EventBus:
"""Typed publish/subscribe bus keyed on dataclass event types.
Uses ``connect``/``disconnect`` to mirror :class:`Signal` vocabulary so
both callback systems share one verb pair across the engine.
"""
__slots__ = ("_subs", "_deferred")
def __init__(self) -> None:
self._subs: dict[EventCls, set[weakref.ReferenceType[Any]]] = {}
self._deferred: list[Any] = []
# ------------------------------------------------------------------
# Subscription
# ------------------------------------------------------------------
[docs]
def connect(self, event_cls: EventCls, handler: Handler) -> None:
"""Register ``handler`` for events of exactly ``event_cls``.
Bound methods are stored via :class:`weakref.WeakMethod`. Other
callables use :class:`weakref.ref`; if the callable does not
support weak references (e.g. a local closure or
:class:`functools.partial`), a :class:`TypeError` is raised.
"""
ref = self._make_ref(handler)
self._subs.setdefault(event_cls, set()).add(ref)
[docs]
def disconnect(self, event_cls: EventCls, handler: Handler) -> None:
"""Remove ``handler`` from ``event_cls``. Idempotent."""
bucket = self._subs.get(event_cls)
if not bucket:
return
target = self._make_ref(handler)
# WeakMethod and weakref.ref both implement __eq__ by comparing
# the underlying referent, so set.discard works on identity of the
# referenced callable.
bucket.discard(target)
if not bucket:
del self._subs[event_cls]
# ------------------------------------------------------------------
# Dispatch
# ------------------------------------------------------------------
[docs]
def emit(self, event: Any) -> None:
"""Synchronously fan ``event`` out to handlers of its exact type.
Raises :class:`TypeError` if ``event`` is not a dataclass instance.
Dead weak references are pruned lazily during iteration.
"""
if not dataclasses.is_dataclass(event) or isinstance(event, type):
raise TypeError(
f"EventBus.emit requires a dataclass instance, got {type(event).__name__!r}"
)
bucket = self._subs.get(type(event))
if not bucket:
return
# Snapshot for stable iteration; collect dead refs to prune.
dead: list[weakref.ReferenceType[Any]] = []
# Preserve registration order: sets are unordered, so we sort by
# insertion via a stable companion. Python sets are insertion-
# ordered as an implementation detail in CPython but not
# guaranteed; we rely on the spec only, so we keep a list copy
# (still O(n) iteration cost). Order within a single event class
# is the order of connect() calls.
for ref in list(bucket):
handler = ref()
if handler is None:
dead.append(ref)
continue
handler(event)
if dead:
for ref in dead:
bucket.discard(ref)
if not bucket:
self._subs.pop(type(event), None)
[docs]
def emit_deferred(self, event: Any) -> None:
"""Queue ``event`` for the next :meth:`flush_deferred` call."""
if not dataclasses.is_dataclass(event) or isinstance(event, type):
raise TypeError(
f"EventBus.emit_deferred requires a dataclass instance, got {type(event).__name__!r}"
)
self._deferred.append(event)
[docs]
def flush_deferred(self) -> None:
"""Drain the deferred queue, dispatching each event via :meth:`emit`."""
if not self._deferred:
return
# Swap-and-clear so handlers that emit_deferred during flush land
# on the next flush, not the current one.
pending = self._deferred
self._deferred = []
for event in pending:
self.emit(event)
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
@staticmethod
def _make_ref(handler: Handler) -> weakref.ReferenceType[Any]:
"""Build the appropriate weak reference for ``handler``."""
if hasattr(handler, "__self__") and hasattr(handler, "__func__"):
return weakref.WeakMethod(handler)
try:
return weakref.ref(handler)
except TypeError as exc:
raise TypeError(
f"EventBus handler {handler!r} does not support weak references; "
"use a bound method on a long-lived object or a module-level function."
) from exc