Distributed EventBus & DEntryPoint

For the distributed eventbus implementation, the data transmitted within the bus needs to be serializable.

Note

Make sure to use DataClassJsonMixin when using a dataclass to permit the serialization of the data and documenting the dtypes.

from dataclasses import dataclass
from dataclasses_json import DataClassJsonMixin
from aiodistbus import DEntryPoint, DEventBus

The handlers for events is the same as the local eventbus implementation, making interoperability feasible.

@dataclass
class ExampleEvent(DataClassJsonMixin):
    msg: str

async def handler(event: ExampleEvent):
    assert isinstance(event, ExampleEvent)
    logger.info(f"Received event {event}")

After the configuration, we have to create the necessary server-client resources and connect the setup:

# Create resources
dbus = DEventBus()
e1, e2 = DEntryPoint(), DEntryPoint()

# Add handlers
await e1.on('example', handler, ExampleEvent)

# Connect
await e1.connect(dbus.ip, dbus.port)
await e2.connect(dbus.ip, dbus.port)

With everything configured, we can not emit messages between the entrypoints:

# Send message and e1's handler is executed
event = await e2.emit('example', ExampleEvent(msg="hello"))

Make sure to close the resources at the end of the program.

# Closing (order doesn't matter)
await bus.close()
await e1.close()
await e2.close()