import traceback import typing import warnings from source import packets from source.behaviors import events from source.error import UntrustedPeerException from source.managers import Manager class EventManager: """ Responsible for receiving packets from other peers and handling them. """ def __init__(self, manager: "Manager"): self.manager = manager # events self.event_handlers: dict[typing.Type[packets.base.BasePacket], events.base.BaseEvent] = {} def register_event_handler(self, packet_type: typing.Type[packets.base.BasePacket], event: events.base.BaseEvent) -> None: """ Register a new event to react to a specific packet type :param packet_type: the type of packet to listen to :param event: the event handler """ self.event_handlers[packet_type] = event def handle(self, packet: packets.base.BasePacket, address: tuple) -> None: """ Handle the packet received :param packet: the packet to handle :param address: the address of the machine that sent the packet """ # get the event handler of this kind of packet event_handler = self.event_handlers.get(type(packet)) if event_handler is None: raise KeyError(f"Unrecognised packet type: {type(packet)}. Has it been registered ?") # use the event handler on the packet event_handler.handle(packet, address) def loop(self) -> None: """ Handle events forever """ while True: try: # wait for a new packet packet, address = self.manager.communication.receive() print(f"Received message from {address}: {packet}") # give it to the event handler self.manager.event.handle(packet, address) except UntrustedPeerException: print("Ignored: untrusted peer.") except Exception: # NOQA warnings.warn(traceback.format_exc())