diff --git a/requirements.txt b/requirements.txt index 5b5d990..b1f9d61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,16 @@ -sounddevice -msgpack -cryptography +# extended standard bidict +pause +sortedcontainers +numpy + +# networking +msgpack + +# cryptography +cryptography + +# audio +pydub +audioop-lts +sounddevice \ No newline at end of file diff --git a/source/behaviors/events/AudioEvent.py b/source/behaviors/events/AudioEvent.py new file mode 100644 index 0000000..3507dbc --- /dev/null +++ b/source/behaviors/events/AudioEvent.py @@ -0,0 +1,12 @@ +from source import packets +from source.behaviors.events import base + + +class AudioEvent(base.BaseEvent): + """ + Event reacting to receiving audio data. + """ + + def handle(self, packet: packets.AudioPacket, address: tuple): + # add the audio chunk to the buffer of audio packet to play + self.manager.audio.add_audio(packet) diff --git a/source/behaviors/events/__init__.py b/source/behaviors/events/__init__.py index a3276f6..eb5f395 100644 --- a/source/behaviors/events/__init__.py +++ b/source/behaviors/events/__init__.py @@ -2,3 +2,4 @@ from . import base from .DiscoveryEvent import DiscoveryEvent from .PeerEvent import PeerEvent +from .AudioEvent import AudioEvent \ No newline at end of file diff --git a/source/behaviors/roles/MasterRole.py b/source/behaviors/roles/MasterRole.py index f27d2d3..c4a1538 100644 --- a/source/behaviors/roles/MasterRole.py +++ b/source/behaviors/roles/MasterRole.py @@ -1,6 +1,13 @@ -import time +import os +from datetime import datetime, timedelta + +import pause +import pydub from source.behaviors.roles import base +from source.managers import Manager +from source.packets import AudioPacket +from source.utils.crypto.type import CipherType class MasterRole(base.BaseRole): @@ -9,7 +16,47 @@ class MasterRole(base.BaseRole): It will be the machine responsible for emitting data that the others peers should play together. """ - def handle(self): - print("I am now the master !") + TARGET_SIZE: int = 60 * 1024 # set an upper bound because of the IPv6 maximum packet size. - time.sleep(1) \ No newline at end of file + def __init__(self, manager: "Manager"): + super().__init__(manager) + + # generate a random secret key for AES communication + self.manager.communication.secret_key = os.urandom(32) + + # prepare the audio file that will be streamed + self.audio = pydub.AudioSegment.from_file("../assets/Caravan Palace - Wonderland.mp3") + self.play_time = datetime.now() + + + def handle(self): + # TODO(Faraphel): check if another server is emitting sound in the network. Return to undefined if yes + + # calculate the number of bytes per milliseconds in the audio + bytes_per_ms = self.audio.frame_rate * self.audio.sample_width * self.audio.channels / 1000 + # calculate the required chunk duration to reach that size + chunk_duration = timedelta(milliseconds=self.TARGET_SIZE / bytes_per_ms) + + # calculate the audio time + chunk_start_time = datetime.now() - self.play_time + chunk_end_time = chunk_start_time + chunk_duration + + # get the music for that period + chunk = self.audio[ + chunk_start_time.total_seconds() * 1000 : + chunk_end_time.total_seconds() * 1000 + ] + + # broadcast it in the network + audio_packet = AudioPacket( + datetime.now() + timedelta(seconds=5), # play it in some seconds to let all the machines get the sample + chunk.channels, + chunk.frame_rate, + chunk.sample_width, + chunk.raw_data, + ) + self.manager.communication.broadcast(audio_packet, CipherType.AES_CBC) + + # wait for the audio to play + # TODO(Faraphel): should adapt to the compute time above + pause.until(self.play_time + chunk_end_time) diff --git a/source/behaviors/roles/SlaveRole.py b/source/behaviors/roles/SlaveRole.py new file mode 100644 index 0000000..35d7349 --- /dev/null +++ b/source/behaviors/roles/SlaveRole.py @@ -0,0 +1,11 @@ +from source.behaviors.roles import base + +class SlaveRole(base.BaseRole): + """ + Role used when the machine is declared as a slave. + It shall listen for a master and check if everything is working properly + """ + + def handle(self): + # TODO(Faraphel): ping the server and check if it is working properly. Return to undefined if no. + pass diff --git a/source/managers/AudioManager.py b/source/managers/AudioManager.py new file mode 100644 index 0000000..c3c5654 --- /dev/null +++ b/source/managers/AudioManager.py @@ -0,0 +1,68 @@ +import threading +from datetime import datetime + +import numpy +import pause +import sortedcontainers +import sounddevice + +from source import packets, managers +from source.utils.audio.audio import sample_width_to_type + + +class AudioManager: + def __init__(self, manager: "managers.Manager"): + # buffer containing the set of audio chunk to play. Sort them by their time to play + self.buffer = sortedcontainers.SortedList(key=lambda audio: audio.time) + + # thread support + self.lock = threading.Lock() + self.new_audio_event = threading.Event() # event triggered when a new audio have been added + + def add_audio(self, audio: packets.AudioPacket) -> None: + """ + Add a new audio chunk to play + :param audio: the audio chunk to play + """ + + with self.lock: + # add the audio packet to the buffer + self.buffer.add(audio) + # trigger the new audio event + self.new_audio_event.set() + + def handle(self) -> None: + """ + Play the audio chunk in the buffer at the given time + """ + + # wait for a new audio packet + self.new_audio_event.wait() + + # get the most recent audio packet to play + audio: packets.AudioPacket = self.buffer.pop(0) + + # if the audio should have been played before, skip it + if audio.time < datetime.now(): + return + + # create a numpy array for our sample + sample = numpy.frombuffer(audio.data, dtype=sample_width_to_type(numpy.int16)) + # reshape it to have a sub-array for each channels + sample = sample.reshape((-1, audio.channels)) + # normalize the sample to be between -1 and 1 + sample = sample / (2 ** (audio.sample_width * 8 - 1)) + + # wait for the audio given time + pause.until(audio.time) + + # play the audio + sounddevice.play(sample, audio.sample_rate) + + def loop(self) -> None: + """ + Handle forever + """ + + while True: + self.handle() diff --git a/source/managers/CommunicationManager.py b/source/managers/CommunicationManager.py index d2aa527..b6fece6 100644 --- a/source/managers/CommunicationManager.py +++ b/source/managers/CommunicationManager.py @@ -37,7 +37,7 @@ class CommunicationManager: self.private_key, self.public_key = rsa_create_key_pair() # TODO(Faraphel): should be decided by the server when changing role, stored somewhere else - self.secret_key: bytes = b"secret key!" + self.secret_key: bytes = b"secret key!".zfill(32) def __del__(self): # close the socket @@ -82,12 +82,15 @@ class CommunicationManager: case CipherType.PLAIN: pass - case CipherType.RSA: - packet_data = utils.crypto.rsa.rsa_encrypt(packet_data, self.public_key) - case CipherType.AES_ECB: packet_data = utils.crypto.aes.aes_ecb_encrypt(packet_data, self.secret_key) + case CipherType.AES_CBC: + packet_data = utils.crypto.aes.aes_cbc_encrypt(packet_data, self.secret_key) + + case CipherType.RSA: + packet_data = utils.crypto.rsa.rsa_encrypt(packet_data, self.public_key) + case _: raise ValueError(f"Unknown cipher: {cipher_type}") @@ -112,12 +115,15 @@ class CommunicationManager: case CipherType.PLAIN: pass - case CipherType.RSA: - packet_data = utils.crypto.rsa.rsa_decrypt(packet_data, self.private_key) - case CipherType.AES_ECB: packet_data = utils.crypto.aes.aes_ecb_decrypt(packet_data, self.secret_key) + case CipherType.AES_CBC: + packet_data = utils.crypto.aes.aes_cbc_decrypt(packet_data, self.secret_key) + + case CipherType.RSA: + packet_data = utils.crypto.rsa.rsa_decrypt(packet_data, self.private_key) + case _: raise ValueError(f"Unknown cipher: {cipher_type}") diff --git a/source/managers/EventManager.py b/source/managers/EventManager.py index ae97316..d200edb 100644 --- a/source/managers/EventManager.py +++ b/source/managers/EventManager.py @@ -1,3 +1,4 @@ +import traceback import typing import warnings @@ -56,7 +57,7 @@ class EventManager: self.manager.event.handle(packet, address) except KeyboardInterrupt: - print("Stopping listener.") + break - except Exception as exception: - warnings.warn(str(exception)) + except: + warnings.warn(traceback.format_exc()) diff --git a/source/managers/Manager.py b/source/managers/Manager.py index 1362b8f..1981e99 100644 --- a/source/managers/Manager.py +++ b/source/managers/Manager.py @@ -10,21 +10,26 @@ class Manager: """ def __init__(self, interface: str): - from . import CommunicationManager, EventManager, RoleManager + from . import CommunicationManager, EventManager, RoleManager, AudioManager # communication manager self.communication = CommunicationManager(self, interface) self.communication.register_packet_type(b"DISC", packets.DiscoveryPacket) self.communication.register_packet_type(b"PEER", packets.PeerPacket) + self.communication.register_packet_type(b"AUDI", packets.AudioPacket) # event manager self.event = EventManager(self) self.event.register_event_handler(packets.DiscoveryPacket, events.DiscoveryEvent(self)) self.event.register_event_handler(packets.PeerPacket, events.PeerEvent(self)) + self.event.register_event_handler(packets.AudioPacket, events.AudioEvent(self)) # role manager self.role = RoleManager(self) + # audio manager + self.audio = AudioManager(self) + # set of addresses associated to their peer self.peers: dict[tuple, packets.PeerPacket] = {} @@ -36,9 +41,12 @@ class Manager: # run a thread for the event and the role manager event_thread = threading.Thread(target=self.event.loop) role_thread = threading.Thread(target=self.role.loop) + audio_thread = threading.Thread(target=self.audio.loop) event_thread.start() role_thread.start() + audio_thread.start() event_thread.join() role_thread.join() + audio_thread.join() diff --git a/source/managers/__init__.py b/source/managers/__init__.py index c140338..40dc25e 100644 --- a/source/managers/__init__.py +++ b/source/managers/__init__.py @@ -1,5 +1,6 @@ from .CommunicationManager import CommunicationManager from .EventManager import EventManager from .RoleManager import RoleManager +from .AudioManager import AudioManager from .Manager import Manager diff --git a/source/packets/AudioPacket.py b/source/packets/AudioPacket.py index ce7c9af..d486ce8 100644 --- a/source/packets/AudioPacket.py +++ b/source/packets/AudioPacket.py @@ -1,4 +1,6 @@ import dataclasses +import zlib +from datetime import datetime import msgpack @@ -11,20 +13,50 @@ class AudioPacket(base.BasePacket): Represent a packet of audio data """ - data: bytes = dataclasses.field() + # expected time to play the audio + time: datetime = dataclasses.field() - rate: int = dataclasses.field() + # audio details channels: int = dataclasses.field() - encoding: int = dataclasses.field() + sample_rate: int = dataclasses.field() + sample_width: int = dataclasses.field() + + # raw audio data + _data: bytes = dataclasses.field(repr=False) + # is the audio zlib compressed + compressed: bool = dataclasses.field(default=False) + def pack(self) -> bytes: return msgpack.packb(( - self.data, - self.rate, + self.time.timestamp(), self.channels, - self.encoding + self.sample_rate, + self.sample_width, + self._data, + self.compressed )) + def __post_init__(self): + # if the audio is not compressed, compress it + if not self.compressed: + self._data = zlib.compress(self._data) + self.compressed = True + + @property + def data(self): + return zlib.decompress(self._data) if self.compressed else self._data + @classmethod def unpack(cls, data: bytes): - return cls(*msgpack.unpackb(data)) + # unpack the message + timestamp, channels, sample_rate, sample_width, data, compressed = msgpack.unpackb(data) + + return cls( + datetime.fromtimestamp(timestamp), + channels, + sample_rate, + sample_width, + data, + compressed, + ) diff --git a/source/utils/audio/__init__.py b/source/utils/audio/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/source/utils/audio/audio.py b/source/utils/audio/audio.py new file mode 100644 index 0000000..bc7ba93 --- /dev/null +++ b/source/utils/audio/audio.py @@ -0,0 +1,19 @@ +import numpy + + +def sample_width_to_type(sample_width: int): + """ + Return the numpy type to use depending on the sample width used in an audio sample + :param sample_width: the sample width + :return: the corresponding numpy type + """ + + match sample_width: + case 1: + return numpy.int8 + case 2: + return numpy.int16 + case 4: + return numpy.int32 + case _: + return numpy.int16 \ No newline at end of file diff --git a/source/utils/crypto/aes.py b/source/utils/crypto/aes.py index 49dd523..aaf989f 100644 --- a/source/utils/crypto/aes.py +++ b/source/utils/crypto/aes.py @@ -1,3 +1,5 @@ +import os + from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -24,7 +26,6 @@ def aes_ecb_encrypt(data: bytes, key: bytes) -> bytes: return encrypted_data - def aes_ecb_decrypt(encrypted_data: bytes, key: bytes) -> bytes: """ Decrypt data encrypted with AES in CBC mode. @@ -45,3 +46,54 @@ def aes_ecb_decrypt(encrypted_data: bytes, key: bytes) -> bytes: data = unpadder.update(decrypted_data) + unpadder.finalize() return data + +def aes_cbc_encrypt(data: bytes, key: bytes) -> bytes: + """ + Encrypt the message using AES in CBC mode. + :param data: the data to cipher + :param key: the key to use for the cipher + :return: the encrypted data + """ + + # pad the data with PKCS7 for AES to work properly + padder = padding.PKCS7(128).padder() + padded_data = padder.update(data) + padder.finalize() + + # create an initialisation vector + iv = os.urandom(16) + + # create the AES cipher in CBC mode + cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) + encryptor = cipher.encryptor() + + # encrypt the padded data + encrypted_data = encryptor.update(padded_data) + encryptor.finalize() + + # prepend the iv to the encrypted data + return iv + encrypted_data + + +def aes_cbc_decrypt(payload: bytes, key: bytes) -> bytes: + """ + Decrypt data encrypted with AES in CBC mode. + :param payload: the encrypted data + :param key: the key used to encrypt it + :return: the decrypted data + """ + + # split the payload into the iv and the encrypted data + iv = payload[:16] + encrypted_data = payload[16:] + + # create the AES cipher in CBC mode + cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) + decryptor = cipher.decryptor() + + # decrypt the encrypted data + decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize() + + # unpad the data + unpadder = padding.PKCS7(128).unpadder() + data = unpadder.update(decrypted_data) + unpadder.finalize() + + return data \ No newline at end of file diff --git a/source/utils/crypto/type.py b/source/utils/crypto/type.py index b9aa530..452f1bb 100644 --- a/source/utils/crypto/type.py +++ b/source/utils/crypto/type.py @@ -4,14 +4,16 @@ import typing class CipherType(enum.Enum): PLAIN = 0x00 - AES_ECB = 0x01 - RSA = 0x02 + AES_ECB = 0x01 # legacy + AES_CBC = 0x02 + RSA = 0x10 CIPHER_SYMMETRIC_TYPES: typing.Final[list[CipherType]] = [ CipherType.PLAIN, - CipherType.AES_ECB + CipherType.AES_ECB, + CipherType.AES_CBC, ] CIPHER_ASYMMETRIC_TYPES: typing.Final[list[CipherType]] = [ - CipherType.RSA + CipherType.RSA, ] \ No newline at end of file