EventBus

Bases: AEventBus

Eventbus

This class is the eventbus. It handles all subscriptions and emits events to the appropriate handlers.

Source code in aiodistbus/eventbus/eventbus.py
class EventBus(AEventBus):
    """Eventbus

    This class is the eventbus. It handles all subscriptions and emits events to
    the appropriate handlers.

    """

    def __init__(self, debug: bool = False):
        super().__init__()
        self._running = True
        self._debug = debug
        self._wildcard_subs: Dict[str, Dict[str, Subscriptions]] = defaultdict(dict)
        self._dtypes: Dict[str, Union[Type, None]] = {}
        self._dentrypoints: Dict[str, DEntryPoint] = {}

    async def _on(self, id: str, handler: Handler):
        sub = Subscriptions(id, handler)
        if "*" in handler.event_type:
            self._wildcard_subs[handler.event_type][id] = sub
        else:
            self._subs[handler.event_type][id] = sub
            self._dtypes[handler.event_type] = handler.dtype

    async def _off(self, id: str, event_type: str):
        if "*" in event_type:
            del self._wildcard_subs[event_type][id]
        else:
            del self._subs[event_type][id]
            del self._dtypes[event_type]

    def _remove(self, id: str):
        to_be_removed: List[str] = []
        for route, subs in self._subs.items():
            if id in subs:
                del self._subs[route][id]
                if len(self._subs[route]) == 0:
                    to_be_removed.append(route)

        for route in to_be_removed:
            del self._subs[route]
            del self._dtypes[route]

    async def _exec(
        self, coros: List[Coroutine], event: Event, subs: Iterable[Subscriptions]
    ):

        for sub in subs:
            # If async function, await it
            if asyncio.iscoroutinefunction(sub.handler.function):
                coros.append(sub.handler.function(event))
            else:
                sub.handler.function(event)

    async def _emit(self, event: Event):

        coros: List[Coroutine] = []

        # Debug
        if self._debug:
            logger.debug(f"aiodistbus: Event={event}")

        # Handle wildcard subscriptions
        for match in wildcard_search(event.type, self._wildcard_subs.keys()):
            await self._exec(coros, event, self._wildcard_subs[match].values())

        # Else, normal subscriptions
        await self._exec(coros, event, self._subs[event.type].values())

        # Wait for all async functions to finish
        if len(coros) > 0:
            await asyncio.gather(*coros)

    ####################################################################
    ## Front-Facing API
    ####################################################################

    async def forward(
        self, ip: str, port: int, event_types: Optional[List[str]] = None
    ):
        """Forward events to another eventbus

        Args:
            ip (str): IP address of the eventbus
            port (int): Port of the eventbus
            event_types (Optional[List[str]], optional): Event types to forward. Defaults to None.

        Examples:
            >>> bus = EventBus()
            >>> dbus = DEventBus()
            >>> await bus.forward(dbus.ip, dbus.port, ["hello"])

        """
        from ..entrypoint import DEntryPoint

        # Handle default event types
        if event_types is None:
            event_types = ["*"]

        # Create entrypoint
        e = DEntryPoint()
        await e.connect(ip, port)
        for event_type in event_types:

            async def _wrapper(event: Event):
                await e.emit(event.type, event.data, event.id)

            handler = Handler(event_type, _wrapper)
            await self._on(f"{ip}:{port}", handler)

        # Store the entrypoint
        self._dentrypoints[f"{ip}:{port}"] = e

    async def deforward(self, ip: str, port: int):
        """Remove forwarding to another eventbus

        Args:
            ip (str): IP address of the eventbus
            port (int): Port of the eventbus

        """
        # Remove handlers
        self._remove(f"{ip}:{port}")

        # Remove entrypoint
        await self._dentrypoints[f"{ip}:{port}"].close()
        del self._dentrypoints[f"{ip}:{port}"]

    async def listen(self, ip: str, port: int, event_types: Optional[List[str]] = None):
        from ..entrypoint import DEntryPoint

        # Handle default event types
        if event_types is None:
            event_types = ["*"]

        # Create entrypoint
        e = DEntryPoint()
        await e.connect(ip, port)
        for event_type in event_types:

            # import pdb; pdb.set_trace()

            async def _wrapper(event):
                await self._emit(event)

            await e.on(event_type, _wrapper, unpack=False)

        # Store the entrypoint
        self._dentrypoints[f"{ip}:{port}"] = e

    async def link(
        self,
        ip: str,
        port: int,
        to_event_types: Optional[List[str]] = None,
        from_event_types: Optional[List[str]] = None,
    ):
        await self.listen(ip, port, event_types=from_event_types)
        await self.forward(ip, port, event_types=to_event_types)

    async def close(self):
        """Close the eventbus"""
        # Emit first to allow for cleanup
        await self._emit(Event("aiodistbus.eventbus.close"))
        self._running = False

        # Close all entrypoints
        for e in self._dentrypoints.values():
            await e.close()

close() async

Close the eventbus

aiodistbus/eventbus/eventbus.py
async def close(self):
    """Close the eventbus"""
    # Emit first to allow for cleanup
    await self._emit(Event("aiodistbus.eventbus.close"))
    self._running = False

    # Close all entrypoints
    for e in self._dentrypoints.values():
        await e.close()

deforward(ip, port) async

Remove forwarding to another eventbus

Parameters:
  • ip (str) –

    IP address of the eventbus

  • port (int) –

    Port of the eventbus

aiodistbus/eventbus/eventbus.py
async def deforward(self, ip: str, port: int):
    """Remove forwarding to another eventbus

    Args:
        ip (str): IP address of the eventbus
        port (int): Port of the eventbus

    """
    # Remove handlers
    self._remove(f"{ip}:{port}")

    # Remove entrypoint
    await self._dentrypoints[f"{ip}:{port}"].close()
    del self._dentrypoints[f"{ip}:{port}"]

forward(ip, port, event_types=None) async

Forward events to another eventbus

Parameters:
  • ip (str) –

    IP address of the eventbus

  • port (int) –

    Port of the eventbus

  • event_types (Optional[List[str]], default: None ) –

    Event types to forward. Defaults to None.

>>> bus = EventBus()
>>> dbus = DEventBus()
>>> await bus.forward(dbus.ip, dbus.port, ["hello"])
aiodistbus/eventbus/eventbus.py
async def forward(
    self, ip: str, port: int, event_types: Optional[List[str]] = None
):
    """Forward events to another eventbus

    Args:
        ip (str): IP address of the eventbus
        port (int): Port of the eventbus
        event_types (Optional[List[str]], optional): Event types to forward. Defaults to None.

    Examples:
        >>> bus = EventBus()
        >>> dbus = DEventBus()
        >>> await bus.forward(dbus.ip, dbus.port, ["hello"])

    """
    from ..entrypoint import DEntryPoint

    # Handle default event types
    if event_types is None:
        event_types = ["*"]

    # Create entrypoint
    e = DEntryPoint()
    await e.connect(ip, port)
    for event_type in event_types:

        async def _wrapper(event: Event):
            await e.emit(event.type, event.data, event.id)

        handler = Handler(event_type, _wrapper)
        await self._on(f"{ip}:{port}", handler)

    # Store the entrypoint
    self._dentrypoints[f"{ip}:{port}"] = e