DEventBus

Bases: AEventBus

Distributed eventbus

This class is the distributed eventbus. This is the server in that broadcasts events to all clients. It also handles the local eventbuses and forwards events to them.

Source code in aiodistbus/eventbus/deventbus.py
class DEventBus(AEventBus):
    """Distributed eventbus

    This class is the distributed eventbus. This is the server in that
    broadcasts events to all clients. It also handles the local eventbuses
    and forwards events to them.

    """

    def __init__(
        self, ip: str = "127.0.0.1", port: int = 0, pulse: Union[float, int] = 15
    ):
        """Initialize the distributed eventbus

        Args:
            ip (str): IP address to bind to. Defaults to '127.0.0.1'
            port (int, optional): Port to bind to. Defaults to 0.
            pulse (Union[float, int], optional): Pulse interval. Defaults to 15.

        """
        super().__init__()

        # Parameters
        self._ip: str = ip
        self._port: int = port
        self._running: bool = False

        # Set up clone server sockets
        self.ctx = zmq.asyncio.Context()
        self.snapshot = self.ctx.socket(zmq.ROUTER)
        self.publisher = self.ctx.socket(zmq.PUB)
        self.collector = self.ctx.socket(zmq.PULL)

        if port == 0:
            port = self.snapshot.bind_to_random_port(f"tcp://{ip}")
            self.publisher.bind(f"tcp://{ip}:{port+1}")
            self.collector.bind(f"tcp://{ip}:{port+2}")
            self._port = port
        else:
            self.snapshot.bind(f"tcp://{ip}:{port}")
            self.publisher.bind(f"tcp://{ip}:{port+1}")
            self.collector.bind(f"tcp://{ip}:{port+2}")

        # Create poller to listen to snapshot and collector
        self.poller = zmq.asyncio.Poller()
        self.poller.register(self.snapshot, zmq.POLLIN)
        self.poller.register(self.collector, zmq.POLLIN)

        self._running = True
        self._flush_flag = asyncio.Event()
        self._flush_flag.clear()
        self.run_task = asyncio.create_task(self._run())

        # Create a timer to pulse to all clients
        # Letting them know if the server is alive
        self.timer = Timer(self._pulse, pulse)
        self.timer.start()

        asyncio_atexit.register(self.close)

        # Local event buses
        self._lbuses_wildcard: Dict[str, List[EventBus]] = defaultdict(list)
        self._lbuses_subs: Dict[str, List[EventBus]] = defaultdict(list)

    @property
    def ip(self):
        return self._ip

    @property
    def port(self):
        return self._port

    async def _emit(self, topic: bytes, msg: bytes, checksum: Optional[bytes] = None):
        if checksum is None:
            checksum = zlib.crc32(msg).to_bytes(4, "big")
        await self.publisher.send_multipart([topic, msg, checksum])

    async def _snapshot_reactor(self, id: bytes, msg: bytes):
        # logger.debug(f"ROUTER: Received {id}: {msg}")

        # Decode message
        dmsg = msg.decode()
        if dmsg == "aiodistbus.eventbus.connect":
            await self.snapshot.send_multipart([id, b"aiodistbus.eventbus.handshake"])

    async def _collector_reactor(self, topic: bytes, msg: bytes, checksum: bytes):

        # Broadcast via socket
        await self._emit(topic, msg, checksum)

        # Only perform this if we have local buses
        if len(self._lbuses_wildcard) == 0 and len(self._lbuses_subs) == 0:
            return

        # If local buses, send them the data
        dtopic = topic.decode()

        # Handle wildcard subscriptions
        bus_to_emit: List[EventBus] = []
        if dtopic not in EVENT_BLACKLIST:
            for match in wildcard_search(dtopic, self._lbuses_wildcard.keys()):
                for bus in self._lbuses_wildcard[match]:
                    # logger.debug(f"{dtopic}: {msg}")
                    bus_to_emit.append(bus)

        # Else, normal subscriptions
        if dtopic in self._lbuses_subs:
            for bus in self._lbuses_subs[dtopic]:
                # logger.debug(f"{dtopic}: {msg}")
                bus_to_emit.append(bus)

        # Identify if any bus has the dtype
        known_type: Optional[Type] = None
        for bus in bus_to_emit:
            if dtopic in bus._dtypes:
                known_type = bus._dtypes[dtopic]

        # Reconstruct the data
        event = await reconstruct(msg.decode(), known_type)

        # Emit the event
        for bus in bus_to_emit:
            await bus._emit(event)

    async def _run(self):
        while self._running:

            event_list = await self.poller.poll(timeout=1000)
            events = dict(event_list)

            # Empty if no events
            if len(events) == 0:
                self._flush_flag.set()
                continue

            if self.snapshot in events:
                [id, msg] = await self.snapshot.recv_multipart()
                await self._snapshot_reactor(id, msg)

            if self.collector in events:
                [topic, data, checksum] = await self.collector.recv_multipart()

                # Check if the checksum is correct
                if verify_checksum(data, checksum):
                    await self._collector_reactor(topic, data, checksum)
                else:
                    logger.error("aiodistbus: Checksum failed for %s", topic.decode())

    async def _pulse(self):
        event_d = Event("aiodistbus.eventbus.pulse").to_json().encode()
        await self._emit(b"aiodistbus.eventbus.pulse", event_d)

    ####################################################################
    ## Front-Facing API
    ####################################################################

    async def flush(self):
        """Flush the eventbus"""
        self._flush_flag.clear()
        await self._flush_flag.wait()

    async def forward(self, bus: EventBus, event_types: Optional[List[str]] = None):
        """Forward events to a local eventbus

        Args:
            bus (EventBus): Local eventbus
            event_types (Optional[List[str]], optional): Event types to forward. Defaults to None.

        Exapmles:
            >>> bus = EventBus()
            >>> dbus = DEventBus()
            >>> await dbus.forward(bus, ["hello"])

        """
        # Handle default event types
        if event_types is None:
            event_types = ["*"]

        # Link
        for event_type in event_types:
            if "*" in event_type:
                self._lbuses_wildcard[event_type].append(bus)
            else:
                self._lbuses_subs[event_type].append(bus)

    async def close(self):
        """Close the eventbus"""

        if self._running:

            # Inform to stop
            event_d = Event("aiodistbus.eventbus.close").to_json().encode()
            await self._emit(b"aiodistbus.eventbus.close", event_d)

            # Stop the main routine
            self._running = False
            await self.run_task

            # Stop the pulse
            await self.timer.stop()

            # Close sockets
            self.snapshot.close()
            self.publisher.close()
            self.collector.close()
            self.ctx.term()

__init__(ip='127.0.0.1', port=0, pulse=15)

Initialize the distributed eventbus

Parameters:
  • ip (str, default: '127.0.0.1' ) –

    IP address to bind to. Defaults to '127.0.0.1'

  • port (int, default: 0 ) –

    Port to bind to. Defaults to 0.

  • pulse (Union[float, int], default: 15 ) –

    Pulse interval. Defaults to 15.

aiodistbus/eventbus/deventbus.py
def __init__(
    self, ip: str = "127.0.0.1", port: int = 0, pulse: Union[float, int] = 15
):
    """Initialize the distributed eventbus

    Args:
        ip (str): IP address to bind to. Defaults to '127.0.0.1'
        port (int, optional): Port to bind to. Defaults to 0.
        pulse (Union[float, int], optional): Pulse interval. Defaults to 15.

    """
    super().__init__()

    # Parameters
    self._ip: str = ip
    self._port: int = port
    self._running: bool = False

    # Set up clone server sockets
    self.ctx = zmq.asyncio.Context()
    self.snapshot = self.ctx.socket(zmq.ROUTER)
    self.publisher = self.ctx.socket(zmq.PUB)
    self.collector = self.ctx.socket(zmq.PULL)

    if port == 0:
        port = self.snapshot.bind_to_random_port(f"tcp://{ip}")
        self.publisher.bind(f"tcp://{ip}:{port+1}")
        self.collector.bind(f"tcp://{ip}:{port+2}")
        self._port = port
    else:
        self.snapshot.bind(f"tcp://{ip}:{port}")
        self.publisher.bind(f"tcp://{ip}:{port+1}")
        self.collector.bind(f"tcp://{ip}:{port+2}")

    # Create poller to listen to snapshot and collector
    self.poller = zmq.asyncio.Poller()
    self.poller.register(self.snapshot, zmq.POLLIN)
    self.poller.register(self.collector, zmq.POLLIN)

    self._running = True
    self._flush_flag = asyncio.Event()
    self._flush_flag.clear()
    self.run_task = asyncio.create_task(self._run())

    # Create a timer to pulse to all clients
    # Letting them know if the server is alive
    self.timer = Timer(self._pulse, pulse)
    self.timer.start()

    asyncio_atexit.register(self.close)

    # Local event buses
    self._lbuses_wildcard: Dict[str, List[EventBus]] = defaultdict(list)
    self._lbuses_subs: Dict[str, List[EventBus]] = defaultdict(list)

close() async

Close the eventbus

aiodistbus/eventbus/deventbus.py
async def close(self):
    """Close the eventbus"""

    if self._running:

        # Inform to stop
        event_d = Event("aiodistbus.eventbus.close").to_json().encode()
        await self._emit(b"aiodistbus.eventbus.close", event_d)

        # Stop the main routine
        self._running = False
        await self.run_task

        # Stop the pulse
        await self.timer.stop()

        # Close sockets
        self.snapshot.close()
        self.publisher.close()
        self.collector.close()
        self.ctx.term()

flush() async

Flush the eventbus

aiodistbus/eventbus/deventbus.py
async def flush(self):
    """Flush the eventbus"""
    self._flush_flag.clear()
    await self._flush_flag.wait()

forward(bus, event_types=None) async

Forward events to a local eventbus

Parameters:
  • bus (EventBus) –

    Local eventbus

  • event_types (Optional[List[str]], default: None ) –

    Event types to forward. Defaults to None.

Exapmles

bus = EventBus() dbus = DEventBus() await dbus.forward(bus, ["hello"])

aiodistbus/eventbus/deventbus.py
async def forward(self, bus: EventBus, event_types: Optional[List[str]] = None):
    """Forward events to a local eventbus

    Args:
        bus (EventBus): Local eventbus
        event_types (Optional[List[str]], optional): Event types to forward. Defaults to None.

    Exapmles:
        >>> bus = EventBus()
        >>> dbus = DEventBus()
        >>> await dbus.forward(bus, ["hello"])

    """
    # Handle default event types
    if event_types is None:
        event_types = ["*"]

    # Link
    for event_type in event_types:
        if "*" in event_type:
            self._lbuses_wildcard[event_type].append(bus)
        else:
            self._lbuses_subs[event_type].append(bus)