Utils
Make a dataclass evented
| Parameters: |
|
|---|
| Returns: |
|
|---|
>>> from dataclasses import dataclass
>>> from aiodistbus import EventBus, make_evented
>>> bus = EventBus()
>>> @dataclass
... class Test:
... a: int = 0
... b: int = 0
...
>>> test = Test()
>>> test = make_evented(test, bus)
>>> test.a = 1 # Event emitted
aiodistbus/wrapper.py
def make_evented(
instance: T,
bus: EventBus,
event_name: Optional[str] = None,
object: Optional[Any] = None,
debounce_interval: float = 0.1,
) -> T:
"""Make a dataclass evented
Args:
instance (T): Instance of dataclass
bus (EventBus): EventBus
event_name (Optional[str], optional): Name of the event. Defaults to None.
object (Optional[Any], optional): Object to send with event. Defaults to None.
debounce_interval (float, optional): Debounce interval. Defaults to 0.1.
Returns:
T: Evented dataclass
Examples:
>>> from dataclasses import dataclass
>>> from aiodistbus import EventBus, make_evented
>>> bus = EventBus()
>>> @dataclass
... class Test:
... a: int = 0
... b: int = 0
...
>>> test = Test()
>>> test = make_evented(test, bus)
>>> test.a = 1 # Event emitted
"""
instance.bus = bus # type: ignore[attr-defined]
instance.__evented_values = {} # type: ignore[attr-defined]
# Name of the event
if not event_name:
event_name = f"{instance.__class__.__name__}.changed"
# Dynamically create a new class with the same name as the instance's class
new_class_name = instance.__class__.__name__
NewClass = type(new_class_name, (instance.__class__,), {})
def make_property(name: str):
"""Make a property
Args:
name (str): Name of the property
"""
def getter(self):
return self.__evented_values.get(name)
def setter(self, value):
self.__evented_values[name] = value
if object:
event_data = object
else:
event_data = self
event = Event(str(event_name), event_data)
debounce_emit(instance, event, bus, debounce_interval)
return property(getter, setter)
def callback(key: str, value: Any):
"""Callback for ObservableDict and ObservableList"""
if object:
event_data = object
else:
event_data = instance
event = Event(str(event_name), event_data)
debounce_emit(instance, event, bus, debounce_interval)
# Wrap the callback
wrap_callback(NewClass, instance, bus, event_name, callback, make_property)
# Change the class of the instance
instance.__class__ = NewClass
return instance