diff --git a/source/Peer.py b/source/Peer.py deleted file mode 100644 index e69de29..0000000 diff --git a/source/behaviors/events/DiscoveryEvent.py b/source/behaviors/events/DiscoveryEvent.py index 8b60c1e..7a21c6e 100644 --- a/source/behaviors/events/DiscoveryEvent.py +++ b/source/behaviors/events/DiscoveryEvent.py @@ -16,4 +16,5 @@ class DiscoveryEvent(base.BaseEvent): isinstance(self.manager.role, roles.MasterRole) ) # send our information back + # don't use any encryption to share the RSA key for further communication self.manager.communication.send(peerPacket, CipherType.PLAIN, address) diff --git a/source/behaviors/events/PeerEvent.py b/source/behaviors/events/PeerEvent.py index 7587962..881e51c 100644 --- a/source/behaviors/events/PeerEvent.py +++ b/source/behaviors/events/PeerEvent.py @@ -1,5 +1,5 @@ from . import base -from source import packets +from source import packets, structures class PeerEvent(base.BaseEvent): @@ -8,8 +8,8 @@ class PeerEvent(base.BaseEvent): """ def handle(self, packet: packets.PeerPacket, address: tuple): - # check if the peer is new - if address not in self.manager.peers: - # add the peer to the peers list - self.manager.peers[address] = packet - print("new peer discovered !") + # update our peers database to add new peer information + self.manager.peer.peers[address] = structures.Peer( + public_key=packet.public_key, + master=packet.master, + ) diff --git a/source/behaviors/roles/SlaveRole.py b/source/behaviors/roles/SlaveRole.py index 3e37f1c..680f81a 100644 --- a/source/behaviors/roles/SlaveRole.py +++ b/source/behaviors/roles/SlaveRole.py @@ -1,5 +1,7 @@ -from source import managers, packets -from source.behaviors.roles import base +from datetime import timedelta, datetime + +from source import managers, packets, structures +from source.behaviors.roles import base, UndefinedRole from source.utils.crypto.type import CipherType @@ -16,9 +18,13 @@ class SlaveRole(base.BaseRole): self.master_address = master_address def handle(self): - # TODO(Faraphel): ping the master and check if it is working properly. Return to undefined if no. - - # NOTE(Faraphel): the secret key might be stored somewhere else than here, or need to be reset + # if we don't have any secret key for this server, request it + # TODO(Faraphel): the secret key might be stored somewhere else than here, or need to be reset if self.manager.communication.secret_key is None: packet = packets.RequestKeyPacket() self.manager.communication.send(packet, CipherType.AES_CBC, self.master_address) + + # check if the master interacted recently + master_peer: structures.Peer = self.manager.peer.peers[self.master_address] + if datetime.now() - master_peer.last_interaction > timedelta(seconds=10): + self.manager.role.current = UndefinedRole(self.manager) diff --git a/source/behaviors/roles/UndefinedRole.py b/source/behaviors/roles/UndefinedRole.py index a593897..3a10221 100644 --- a/source/behaviors/roles/UndefinedRole.py +++ b/source/behaviors/roles/UndefinedRole.py @@ -1,54 +1,32 @@ import time from datetime import datetime, timedelta -from . import base, MasterRole - -from source import packets -from .SlaveRole import SlaveRole -from ...managers import Manager -from ...utils.crypto.type import CipherType +from source.behaviors import roles +from source.behaviors.roles import base class UndefinedRole(base.BaseRole): """ - Role used when the peer have no defined state, for example just after starting. - It looks for the current network peers state and try to integrate itself inside. + Role used when the machine is looking for how it should insert itself in the network """ - def __init__(self, manager: "Manager"): - super().__init__(manager) - - self.old_peers: list[packets.PeerPacket] = [] - self.previous_discovery: datetime = datetime.now() - - def handle(self): - # discover new peers - packet = packets.DiscoveryPacket() - self.manager.communication.broadcast(packet, CipherType.PLAIN) - - # wait for new messages - time.sleep(1) - - # check if a new peer have been registered - if self.manager.peers != self.old_peers: - self.old_peers = self.manager.peers.copy() - self.previous_discovery = datetime.now() - - # check if no more peers have been found in the previous seconds - if datetime.now() - self.previous_discovery >= timedelta(seconds=5): + def handle(self) -> None: + # check if no more peers have been found + if datetime.now() - self.manager.peer.peers.last_added >= timedelta(seconds=5): # SCENARIO 1 - empty network # filter ourselves out of the remote peers remote_peers = { address: peer - for (address, peer) in self.manager.peers.items() + for (address, peer) in self.manager.peer.peers.items() if not self.manager.communication.is_address_local(address) } # if no other peers have been found if len(remote_peers) == 0: + # TODO(Faraphel): do not change the role if we already have it !!!!!! return to using undefined role just for this whole if ? # declare ourselves as the master of the network - self.manager.role.current = MasterRole(self.manager) + self.manager.role.current = roles.MasterRole(self.manager) return # SCENARIO 2 - network with a master @@ -65,9 +43,12 @@ class UndefinedRole(base.BaseRole): master_address, master_peer = master_peers[0] # declare ourselves as a slave of the network - self.manager.role.current = SlaveRole(self.manager, master_address) + self.manager.role.current = roles.SlaveRole(self.manager, master_address) # SCENARIO 3 - network with no master # TODO(Faraphel): elect the machine with the lowest ping in the network - raise NotImplementedError("Not implemented: elect the machine with the lowest ping as a master.") \ No newline at end of file + raise NotImplementedError("Not implemented: elect the machine with the lowest ping as a master.") + + # TODO(Faraphel): calculate the expected date of the condition and wait for it + time.sleep(1) diff --git a/source/behaviors/roles/__init__.py b/source/behaviors/roles/__init__.py index c51027f..ff074ae 100644 --- a/source/behaviors/roles/__init__.py +++ b/source/behaviors/roles/__init__.py @@ -2,4 +2,4 @@ from . import base from .MasterRole import MasterRole from .SlaveRole import SlaveRole -from .UndefinedRole import UndefinedRole +from .UndefinedRole import UndefinedRole \ No newline at end of file diff --git a/source/managers/AudioManager.py b/source/managers/AudioManager.py index f42d1e4..0572811 100644 --- a/source/managers/AudioManager.py +++ b/source/managers/AudioManager.py @@ -12,6 +12,10 @@ from source.utils.audio.audio import sample_width_to_type class AudioManager: + """ + Manage playing audio data in the buffer + """ + def __init__(self, manager: "managers.Manager"): self.stream: typing.Optional[sounddevice.OutputStream] = None diff --git a/source/managers/CommunicationManager.py b/source/managers/CommunicationManager.py index c30e96f..607d560 100644 --- a/source/managers/CommunicationManager.py +++ b/source/managers/CommunicationManager.py @@ -1,10 +1,11 @@ import socket import typing import zlib +from datetime import datetime import bidict -from source import packets, utils +from source import packets, utils, structures from source.managers import Manager from source.utils.crypto.rsa import rsa_create_key_pair from source.utils.crypto.type import CipherType @@ -12,7 +13,7 @@ from source.utils.crypto.type import CipherType class CommunicationManager: """ - Manage everything about communication + Manage the communication between the peers """ def __init__(self, manager: "Manager", interface: str, broadcast_address: str = "ff02::1", port: int = 5555): @@ -37,7 +38,7 @@ class CommunicationManager: self.private_key, self.public_key = rsa_create_key_pair() # TODO(Faraphel): should be decided by the server when changing role, stored somewhere else - self.secret_key: bytes = b"secret key!".zfill(32) + self.secret_key: typing.Optional[bytes] = None def __del__(self): # close the socket @@ -169,6 +170,13 @@ class CommunicationManager: # receive a message payload, address = self.socket.recvfrom(65536) + + # check if there is a peer associated with this address + peer: structures.Peer = self.manager.peer.peers.get(address) + if peer is not None: + # update the latest interaction date + peer.last_interaction = datetime.now() + # decode the payload return self.packet_decode(payload), address diff --git a/source/managers/EventManager.py b/source/managers/EventManager.py index d200edb..a3ddf35 100644 --- a/source/managers/EventManager.py +++ b/source/managers/EventManager.py @@ -9,7 +9,6 @@ from source.managers import Manager class EventManager: """ - Event Manager Responsible for receiving packets from other peers and handling them. """ diff --git a/source/managers/Manager.py b/source/managers/Manager.py index 89a7722..1cc494c 100644 --- a/source/managers/Manager.py +++ b/source/managers/Manager.py @@ -1,7 +1,8 @@ import threading -from source import packets +from source import packets, structures from source.behaviors import events +from source.utils.dict import TimestampedDict class Manager: @@ -10,7 +11,7 @@ class Manager: """ def __init__(self, interface: str): - from . import CommunicationManager, EventManager, RoleManager, AudioManager + from . import CommunicationManager, EventManager, RoleManager, AudioManager, PeerManager # communication manager self.communication = CommunicationManager(self, interface) @@ -34,23 +35,26 @@ class Manager: # audio manager self.audio = AudioManager(self) - # set of addresses associated to their peer - self.peers: dict[tuple, packets.PeerPacket] = {} + # peer manager + self.peer = PeerManager(self) def loop(self) -> None: """ - Handle the event and role managers forever + Handle the sub-managers forever """ # run a thread for the event and the role manager event_thread = threading.Thread(target=self.event.loop) role_thread = threading.Thread(target=self.role.loop) audio_thread = threading.Thread(target=self.audio.loop) + peer_thread = threading.Thread(target=self.peer.loop) event_thread.start() role_thread.start() audio_thread.start() + peer_thread.start() event_thread.join() role_thread.join() audio_thread.join() + peer_thread.join() diff --git a/source/managers/PeerManager.py b/source/managers/PeerManager.py new file mode 100644 index 0000000..aff89b9 --- /dev/null +++ b/source/managers/PeerManager.py @@ -0,0 +1,30 @@ +import time + +from source import packets, structures +from source.managers import Manager +from source.utils.crypto.type import CipherType +from source.utils.dict import TimestampedDict + + +class PeerManager: + """ + Manage the peers network + """ + + def __init__(self, manager: "Manager"): + self.manager = manager + + # set of addresses associated to their peer + self.peers: TimestampedDict[tuple, structures.Peer] = TimestampedDict() + + def handle(self) -> None: + # send requests to discover new peers + packet = packets.DiscoveryPacket() + self.manager.communication.broadcast(packet, CipherType.PLAIN) + + def loop(self) -> None: + while True: + self.handle() + + # TODO(Faraphel): adjust sleep time ? as much seconds as there are peer to avoid flooding the network ? + time.sleep(1) diff --git a/source/managers/__init__.py b/source/managers/__init__.py index 40dc25e..620eae0 100644 --- a/source/managers/__init__.py +++ b/source/managers/__init__.py @@ -2,5 +2,6 @@ from .CommunicationManager import CommunicationManager from .EventManager import EventManager from .RoleManager import RoleManager from .AudioManager import AudioManager +from .PeerManager import PeerManager from .Manager import Manager diff --git a/source/structures/Peer.py b/source/structures/Peer.py new file mode 100644 index 0000000..09ea1ff --- /dev/null +++ b/source/structures/Peer.py @@ -0,0 +1,14 @@ +import dataclasses +from datetime import datetime + + +@dataclasses.dataclass +class Peer: + # public asymmetric key + public_key: bytes = dataclasses.field() + + # is the peer a master + master: bool = dataclasses.field() + + # when did the peer last communication with us occurred + last_interaction: datetime = dataclasses.field(default_factory=datetime.now) diff --git a/source/structures/__init__.py b/source/structures/__init__.py new file mode 100644 index 0000000..1912dc6 --- /dev/null +++ b/source/structures/__init__.py @@ -0,0 +1 @@ +from .Peer import Peer diff --git a/source/utils/dict/TimestampedDict.py b/source/utils/dict/TimestampedDict.py new file mode 100644 index 0000000..6cab81b --- /dev/null +++ b/source/utils/dict/TimestampedDict.py @@ -0,0 +1,42 @@ +import collections +from datetime import datetime + + +class TimestampedDict(collections.UserDict): + """ + A dictionary with additional metadata + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) # NOQA + + self._last_modified = datetime.now() # last time a value got modified + self._last_added = datetime.now() # last time a new value have been added + + def __setitem__(self, key, value): + # if the key is already used, we only update a value + update = key in self + # set the value + super().__setitem__(key, value) + + # update modification time + self._last_modified = datetime.now() + # if this is not an update, set the added time + if not update: + self._last_added = datetime.now() + + def __delitem__(self, key): + super().__delitem__(key) + self._last_modified = datetime.now() + + def update(self, *args, **kwargs): + super().update(*args, **kwargs) # NOQA + self._last_modified = datetime.now() + + @property + def last_modified(self): + return self._last_modified + + @property + def last_added(self): + return self._last_added \ No newline at end of file diff --git a/source/utils/dict/__init__.py b/source/utils/dict/__init__.py new file mode 100644 index 0000000..3bb64fb --- /dev/null +++ b/source/utils/dict/__init__.py @@ -0,0 +1 @@ +from .TimestampedDict import TimestampedDict