M2-PT-DRP/source/managers/EventManager.py

53 lines
1.8 KiB
Python

import typing
import warnings
from source import packets
from source.behaviors import events
from source.managers import Manager
class EventManager:
def __init__(self, manager: "Manager"):
self.manager = manager
# events
self.event_handlers: dict[typing.Type[packets.base.BasePacket], events.base.BaseEvent] = {}
def register_event_handler(self, packet_type: typing.Type[packets.base.BasePacket], event: events.base.BaseEvent) -> None:
"""
Register a new event to react to a specific packet type
:param packet_type: the type of packet to listen to
:param event: the event handler
"""
self.event_handlers[packet_type] = event
def handle(self, packet: packets.base.BasePacket, address: tuple) -> None:
"""
Handle the packet received
:param packet: the packet to handle
:param address: the address of the machine that sent the packet
"""
# get the event handler of this kind of packet
event_handler = self.event_handlers.get(type(packet))
if event_handler is None:
raise KeyError(f"Unrecognised packet type: {type(packet)}. Has it been registered ?")
# use the event handler on the packet
event_handler.handle(packet, address)
def loop(self):
while True:
try:
# wait for a new packet
packet, address = self.manager.communication.receive()
print(f"Received message from {address}: {packet}")
# give it to the event handler
self.manager.event.handle(packet, address)
except KeyboardInterrupt:
print("Stopping listener.")
except Exception as exception:
warnings.warn(str(exception))