DEntryPoint
Bases: AEntryPoint
Source code in aiodistbus/entrypoint/dentrypoint.py
class DEntryPoint(AEntryPoint):
def __init__(self, pulse_ttl: Union[int, float] = 50, pulse_limit: int = 4):
super().__init__()
# Pulse
self.pulse_ttl = pulse_ttl
self.pulse_timer: Optional[Timer] = None
self.pulse_count: int = 0
self.pulse_fail: int = 0
self.pulse_limit: int = pulse_limit
self._on_disrupt: Optional[Callable] = None
# Primary ZeroMQ Client
self._running: bool = False
self._connected: asyncio.Event = asyncio.Event()
self._lock: asyncio.Lock = asyncio.Lock()
self.run_task: Optional[asyncio.Task] = None
self.ctx: Optional[zmq.asyncio.Context] = None
self.snapshot: Optional[zmq.asyncio.Socket] = None
self.subscriber: Optional[zmq.asyncio.Socket] = None
self.publisher: Optional[zmq.asyncio.Socket] = None
asyncio_atexit.register(self.close)
async def snapshot_reactor(self):
assert self.snapshot, "SNAPSHOT socket not initialized"
msg = await self.snapshot.recv_multipart()
dmsg = msg[0].decode("utf-8")
if dmsg == "aiodistbus.eventbus.handshake":
self._connected.set()
async def subscriber_reactor(self):
assert self.subscriber, "SUB socket not initialized"
[topic, event, checksum] = await self.subscriber.recv_multipart()
# Before further processing, perform checksum
if not verify_checksum(event, checksum):
logger.error(f"aiodistbus: Checksum failed: {event.decode('utf-8')}")
return
topic = topic.decode("utf-8")
event = event.decode("utf-8")
# logger.debug(f"SUBSCRIBER: Received {topic} - {event} - {len(self._received)}")
# Reconstruct the data
if topic in self._handlers:
known_type = self._handlers[topic].dtype
else:
known_type = None
try:
event = await reconstruct(event, known_type)
except Exception as e:
logger.error(f"aiodistbus: Failed to reconstruct: {event} - {e}")
return
# Obtain the handlers
coros: List[Coroutine] = []
if topic in self._handlers:
coros.append(self._handlers[topic].function(event))
for match in wildcard_search(topic, self._wildcards.keys()):
coros.append(self._wildcards[match].function(event))
# Await the handlers
if len(coros) > 0:
await asyncio.gather(*coros)
async def _run(self):
assert self.subscriber, "SUB socket not initialized"
assert self.snapshot, "SNAPSHOT socket not initialized"
# After connect and identify established, listen
while self._running:
event_list = await self.poller.poll(timeout=1000)
events = dict(event_list)
# Empty if no events
if len(events) == 0:
continue
if self.snapshot in events:
await self.snapshot_reactor()
if self.subscriber in events:
await self.subscriber_reactor()
async def _update_handlers(
self, event_type: Optional[str] = None, remove: bool = False
):
if not self.subscriber:
return
if remove and event_type:
self.subscriber.setsockopt(zmq.UNSUBSCRIBE, event_type.encode("utf-8"))
if event_type:
if "*" in event_type:
topic = event_type.replace(".*", "")
topic = topic.replace("*", "")
self.subscriber.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8"))
else:
self.subscriber.setsockopt(zmq.SUBSCRIBE, event_type.encode("utf-8"))
else:
for event_type in self._handlers.keys():
self.subscriber.setsockopt(zmq.SUBSCRIBE, event_type.encode("utf-8"))
for event_type in self._wildcards.keys():
topic = event_type.replace(".*", "")
topic = topic.replace("*", "")
self.subscriber.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8"))
async def _pulse_sub(self):
self.pulse_count += 1
async def _pulse_check(self):
# Check if there is a pulse
if self.pulse_count == 0:
self.pulse_fail += 1
else:
self.pulse_fail = 0
# If too many failures, close
if self.pulse_fail > self.pulse_limit:
logger.error("aiodistbus: Pulse failure limit reached")
if self._on_disrupt:
await self._on_disrupt()
await self.close()
return
# Reset pulse count
self.pulse_count = 0
####################################################################
## Front-Facing API
####################################################################
@property
def running(self) -> bool:
return self._running
async def emit(
self, event_type: str, data: Optional[Any] = None, id: Optional[str] = None
) -> Optional[Event]:
"""Emit an event
Args:
event_type (str): Event type
data (Any): Data to send
id (Optional[str], optional): Event ID. Defaults to None.
Returns:
Optional[Event]: Event object
"""
if not self.snapshot or not self.publisher:
logger.warning("Not connected to server")
return None
# Encode data
try:
encoded_data = encode(data)
except ValueError:
logger.error(f"aiodistbus: Failed to encode: {data}")
return None
# Obtain the dtype and format it
dtype = type(data)
dtype_str = f"{dtype.__module__}.{dtype.__name__}"
# Package data with an Event object
if id:
event = Event(event_type, encoded_data, dtype=dtype_str, id=id)
else:
event = Event(event_type, encoded_data, dtype=dtype_str)
# Serliaze the event
serialized_event = event.to_json().encode()
# Compute checksum
checksum = zlib.crc32(serialized_event)
# Send the data
# logger.debug(f"PUBLISHER: {event}")
try:
await self.publisher.send_multipart(
[
event_type.encode("utf-8"),
serialized_event,
checksum.to_bytes(4, "big"),
]
)
except zmq.error.ZMQError:
logger.error("Could not send event")
return None
return event
async def connect(
self,
ip: str,
port: int,
on_disrupt: Optional[Callable] = None,
timeout: Optional[Union[float, int]] = None,
):
"""Connect to EventBus server
Args:
ip (str): IP address
port (int): Port
on_disrupt (Optional[Callable], optional): Callback on disruption. Defaults to None.
Exceptions:
asyncio.TimeoutError: If timeout is reached
"""
self.ctx = zmq.asyncio.Context()
self.snapshot = self.ctx.socket(zmq.DEALER)
self.snapshot.setsockopt(zmq.IDENTITY, self.id.encode("utf-8"))
self.snapshot.connect(f"tcp://{ip}:{port}")
self.subscriber = self.ctx.socket(zmq.SUB)
self.subscriber.connect(f"tcp://{ip}:{port+1}")
self.publisher = self.ctx.socket(zmq.PUSH)
self.publisher.connect(f"tcp://{ip}:{port+2}")
# Avoid making any socket linger
self.snapshot.linger = 0
self.subscriber.linger = 0
self.publisher.linger = 0
# Keeping track of state
self._running = True
# Update the subscriber's topics
await self.on("aiodistbus.eventbus.close", self.close, create_task=True)
await self.on("aiodistbus.eventbus.pulse", self._pulse_sub)
await self._update_handlers()
# Using a poller for the subscriber
self.poller = zmq.asyncio.Poller()
self.poller.register(self.subscriber, zmq.POLLIN)
self.poller.register(self.snapshot, zmq.POLLIN)
self.run_task = asyncio.create_task(self._run())
# Send a connect msg
await self.snapshot.send("aiodistbus.eventbus.connect".encode("utf-8"))
if timeout:
await asyncio.wait_for(self._connected.wait(), timeout=timeout)
else:
await self._connected.wait()
# Create task check the pulse
if on_disrupt:
self._on_disrupt = on_disrupt
self.pulse_timer = Timer(self._pulse_check, self.pulse_ttl)
self.pulse_timer.start()
async def close(self):
"""Close the EventBus client"""
# If not closed already, close
async with self._lock:
if self._running:
# Stop the main task
self._running = False
self._connected.clear()
if self.run_task:
await self.run_task
# Stop the timer
if self.pulse_timer:
await self.pulse_timer.stop()
if self.ctx and not self.ctx.closed:
if self.snapshot and not self.snapshot.closed:
self.snapshot.close()
if self.subscriber and not self.subscriber.closed:
self.subscriber.close()
if self.publisher and not self.publisher.closed:
self.publisher.close()
self.ctx.term()
close()
async
Close the EventBus client
aiodistbus/entrypoint/dentrypoint.py
async def close(self):
"""Close the EventBus client"""
# If not closed already, close
async with self._lock:
if self._running:
# Stop the main task
self._running = False
self._connected.clear()
if self.run_task:
await self.run_task
# Stop the timer
if self.pulse_timer:
await self.pulse_timer.stop()
if self.ctx and not self.ctx.closed:
if self.snapshot and not self.snapshot.closed:
self.snapshot.close()
if self.subscriber and not self.subscriber.closed:
self.subscriber.close()
if self.publisher and not self.publisher.closed:
self.publisher.close()
self.ctx.term()
connect(ip, port, on_disrupt=None, timeout=None)
async
Connect to EventBus server
| Parameters: |
|
|---|
| Raises: |
|
|---|
aiodistbus/entrypoint/dentrypoint.py
async def connect(
self,
ip: str,
port: int,
on_disrupt: Optional[Callable] = None,
timeout: Optional[Union[float, int]] = None,
):
"""Connect to EventBus server
Args:
ip (str): IP address
port (int): Port
on_disrupt (Optional[Callable], optional): Callback on disruption. Defaults to None.
Exceptions:
asyncio.TimeoutError: If timeout is reached
"""
self.ctx = zmq.asyncio.Context()
self.snapshot = self.ctx.socket(zmq.DEALER)
self.snapshot.setsockopt(zmq.IDENTITY, self.id.encode("utf-8"))
self.snapshot.connect(f"tcp://{ip}:{port}")
self.subscriber = self.ctx.socket(zmq.SUB)
self.subscriber.connect(f"tcp://{ip}:{port+1}")
self.publisher = self.ctx.socket(zmq.PUSH)
self.publisher.connect(f"tcp://{ip}:{port+2}")
# Avoid making any socket linger
self.snapshot.linger = 0
self.subscriber.linger = 0
self.publisher.linger = 0
# Keeping track of state
self._running = True
# Update the subscriber's topics
await self.on("aiodistbus.eventbus.close", self.close, create_task=True)
await self.on("aiodistbus.eventbus.pulse", self._pulse_sub)
await self._update_handlers()
# Using a poller for the subscriber
self.poller = zmq.asyncio.Poller()
self.poller.register(self.subscriber, zmq.POLLIN)
self.poller.register(self.snapshot, zmq.POLLIN)
self.run_task = asyncio.create_task(self._run())
# Send a connect msg
await self.snapshot.send("aiodistbus.eventbus.connect".encode("utf-8"))
if timeout:
await asyncio.wait_for(self._connected.wait(), timeout=timeout)
else:
await self._connected.wait()
# Create task check the pulse
if on_disrupt:
self._on_disrupt = on_disrupt
self.pulse_timer = Timer(self._pulse_check, self.pulse_ttl)
self.pulse_timer.start()
emit(event_type, data=None, id=None)
async
Emit an event
| Parameters: |
|
|---|
| Returns: |
|
|---|
aiodistbus/entrypoint/dentrypoint.py
async def emit(
self, event_type: str, data: Optional[Any] = None, id: Optional[str] = None
) -> Optional[Event]:
"""Emit an event
Args:
event_type (str): Event type
data (Any): Data to send
id (Optional[str], optional): Event ID. Defaults to None.
Returns:
Optional[Event]: Event object
"""
if not self.snapshot or not self.publisher:
logger.warning("Not connected to server")
return None
# Encode data
try:
encoded_data = encode(data)
except ValueError:
logger.error(f"aiodistbus: Failed to encode: {data}")
return None
# Obtain the dtype and format it
dtype = type(data)
dtype_str = f"{dtype.__module__}.{dtype.__name__}"
# Package data with an Event object
if id:
event = Event(event_type, encoded_data, dtype=dtype_str, id=id)
else:
event = Event(event_type, encoded_data, dtype=dtype_str)
# Serliaze the event
serialized_event = event.to_json().encode()
# Compute checksum
checksum = zlib.crc32(serialized_event)
# Send the data
# logger.debug(f"PUBLISHER: {event}")
try:
await self.publisher.send_multipart(
[
event_type.encode("utf-8"),
serialized_event,
checksum.to_bytes(4, "big"),
]
)
except zmq.error.ZMQError:
logger.error("Could not send event")
return None
return event