master now emit audio in the network and others machines play it at the given time.

This commit is contained in:
study-faraphel 2025-01-04 13:30:27 +01:00
parent 142d0f6db8
commit 077d1d3d9d
15 changed files with 302 additions and 30 deletions

View file

@ -1,4 +1,16 @@
sounddevice
msgpack
cryptography
# extended standard
bidict
pause
sortedcontainers
numpy
# networking
msgpack
# cryptography
cryptography
# audio
pydub
audioop-lts
sounddevice

View file

@ -0,0 +1,12 @@
from source import packets
from source.behaviors.events import base
class AudioEvent(base.BaseEvent):
"""
Event reacting to receiving audio data.
"""
def handle(self, packet: packets.AudioPacket, address: tuple):
# add the audio chunk to the buffer of audio packet to play
self.manager.audio.add_audio(packet)

View file

@ -2,3 +2,4 @@ from . import base
from .DiscoveryEvent import DiscoveryEvent
from .PeerEvent import PeerEvent
from .AudioEvent import AudioEvent

View file

@ -1,6 +1,13 @@
import time
import os
from datetime import datetime, timedelta
import pause
import pydub
from source.behaviors.roles import base
from source.managers import Manager
from source.packets import AudioPacket
from source.utils.crypto.type import CipherType
class MasterRole(base.BaseRole):
@ -9,7 +16,47 @@ class MasterRole(base.BaseRole):
It will be the machine responsible for emitting data that the others peers should play together.
"""
def handle(self):
print("I am now the master !")
TARGET_SIZE: int = 60 * 1024 # set an upper bound because of the IPv6 maximum packet size.
time.sleep(1)
def __init__(self, manager: "Manager"):
super().__init__(manager)
# generate a random secret key for AES communication
self.manager.communication.secret_key = os.urandom(32)
# prepare the audio file that will be streamed
self.audio = pydub.AudioSegment.from_file("../assets/Caravan Palace - Wonderland.mp3")
self.play_time = datetime.now()
def handle(self):
# TODO(Faraphel): check if another server is emitting sound in the network. Return to undefined if yes
# calculate the number of bytes per milliseconds in the audio
bytes_per_ms = self.audio.frame_rate * self.audio.sample_width * self.audio.channels / 1000
# calculate the required chunk duration to reach that size
chunk_duration = timedelta(milliseconds=self.TARGET_SIZE / bytes_per_ms)
# calculate the audio time
chunk_start_time = datetime.now() - self.play_time
chunk_end_time = chunk_start_time + chunk_duration
# get the music for that period
chunk = self.audio[
chunk_start_time.total_seconds() * 1000 :
chunk_end_time.total_seconds() * 1000
]
# broadcast it in the network
audio_packet = AudioPacket(
datetime.now() + timedelta(seconds=5), # play it in some seconds to let all the machines get the sample
chunk.channels,
chunk.frame_rate,
chunk.sample_width,
chunk.raw_data,
)
self.manager.communication.broadcast(audio_packet, CipherType.AES_CBC)
# wait for the audio to play
# TODO(Faraphel): should adapt to the compute time above
pause.until(self.play_time + chunk_end_time)

View file

@ -0,0 +1,11 @@
from source.behaviors.roles import base
class SlaveRole(base.BaseRole):
"""
Role used when the machine is declared as a slave.
It shall listen for a master and check if everything is working properly
"""
def handle(self):
# TODO(Faraphel): ping the server and check if it is working properly. Return to undefined if no.
pass

View file

@ -0,0 +1,68 @@
import threading
from datetime import datetime
import numpy
import pause
import sortedcontainers
import sounddevice
from source import packets, managers
from source.utils.audio.audio import sample_width_to_type
class AudioManager:
def __init__(self, manager: "managers.Manager"):
# buffer containing the set of audio chunk to play. Sort them by their time to play
self.buffer = sortedcontainers.SortedList(key=lambda audio: audio.time)
# thread support
self.lock = threading.Lock()
self.new_audio_event = threading.Event() # event triggered when a new audio have been added
def add_audio(self, audio: packets.AudioPacket) -> None:
"""
Add a new audio chunk to play
:param audio: the audio chunk to play
"""
with self.lock:
# add the audio packet to the buffer
self.buffer.add(audio)
# trigger the new audio event
self.new_audio_event.set()
def handle(self) -> None:
"""
Play the audio chunk in the buffer at the given time
"""
# wait for a new audio packet
self.new_audio_event.wait()
# get the most recent audio packet to play
audio: packets.AudioPacket = self.buffer.pop(0)
# if the audio should have been played before, skip it
if audio.time < datetime.now():
return
# create a numpy array for our sample
sample = numpy.frombuffer(audio.data, dtype=sample_width_to_type(numpy.int16))
# reshape it to have a sub-array for each channels
sample = sample.reshape((-1, audio.channels))
# normalize the sample to be between -1 and 1
sample = sample / (2 ** (audio.sample_width * 8 - 1))
# wait for the audio given time
pause.until(audio.time)
# play the audio
sounddevice.play(sample, audio.sample_rate)
def loop(self) -> None:
"""
Handle forever
"""
while True:
self.handle()

View file

@ -37,7 +37,7 @@ class CommunicationManager:
self.private_key, self.public_key = rsa_create_key_pair()
# TODO(Faraphel): should be decided by the server when changing role, stored somewhere else
self.secret_key: bytes = b"secret key!"
self.secret_key: bytes = b"secret key!".zfill(32)
def __del__(self):
# close the socket
@ -82,12 +82,15 @@ class CommunicationManager:
case CipherType.PLAIN:
pass
case CipherType.RSA:
packet_data = utils.crypto.rsa.rsa_encrypt(packet_data, self.public_key)
case CipherType.AES_ECB:
packet_data = utils.crypto.aes.aes_ecb_encrypt(packet_data, self.secret_key)
case CipherType.AES_CBC:
packet_data = utils.crypto.aes.aes_cbc_encrypt(packet_data, self.secret_key)
case CipherType.RSA:
packet_data = utils.crypto.rsa.rsa_encrypt(packet_data, self.public_key)
case _:
raise ValueError(f"Unknown cipher: {cipher_type}")
@ -112,12 +115,15 @@ class CommunicationManager:
case CipherType.PLAIN:
pass
case CipherType.RSA:
packet_data = utils.crypto.rsa.rsa_decrypt(packet_data, self.private_key)
case CipherType.AES_ECB:
packet_data = utils.crypto.aes.aes_ecb_decrypt(packet_data, self.secret_key)
case CipherType.AES_CBC:
packet_data = utils.crypto.aes.aes_cbc_decrypt(packet_data, self.secret_key)
case CipherType.RSA:
packet_data = utils.crypto.rsa.rsa_decrypt(packet_data, self.private_key)
case _:
raise ValueError(f"Unknown cipher: {cipher_type}")

View file

@ -1,3 +1,4 @@
import traceback
import typing
import warnings
@ -56,7 +57,7 @@ class EventManager:
self.manager.event.handle(packet, address)
except KeyboardInterrupt:
print("Stopping listener.")
break
except Exception as exception:
warnings.warn(str(exception))
except:
warnings.warn(traceback.format_exc())

View file

@ -10,21 +10,26 @@ class Manager:
"""
def __init__(self, interface: str):
from . import CommunicationManager, EventManager, RoleManager
from . import CommunicationManager, EventManager, RoleManager, AudioManager
# communication manager
self.communication = CommunicationManager(self, interface)
self.communication.register_packet_type(b"DISC", packets.DiscoveryPacket)
self.communication.register_packet_type(b"PEER", packets.PeerPacket)
self.communication.register_packet_type(b"AUDI", packets.AudioPacket)
# event manager
self.event = EventManager(self)
self.event.register_event_handler(packets.DiscoveryPacket, events.DiscoveryEvent(self))
self.event.register_event_handler(packets.PeerPacket, events.PeerEvent(self))
self.event.register_event_handler(packets.AudioPacket, events.AudioEvent(self))
# role manager
self.role = RoleManager(self)
# audio manager
self.audio = AudioManager(self)
# set of addresses associated to their peer
self.peers: dict[tuple, packets.PeerPacket] = {}
@ -36,9 +41,12 @@ class Manager:
# run a thread for the event and the role manager
event_thread = threading.Thread(target=self.event.loop)
role_thread = threading.Thread(target=self.role.loop)
audio_thread = threading.Thread(target=self.audio.loop)
event_thread.start()
role_thread.start()
audio_thread.start()
event_thread.join()
role_thread.join()
audio_thread.join()

View file

@ -1,5 +1,6 @@
from .CommunicationManager import CommunicationManager
from .EventManager import EventManager
from .RoleManager import RoleManager
from .AudioManager import AudioManager
from .Manager import Manager

View file

@ -1,4 +1,6 @@
import dataclasses
import zlib
from datetime import datetime
import msgpack
@ -11,20 +13,50 @@ class AudioPacket(base.BasePacket):
Represent a packet of audio data
"""
data: bytes = dataclasses.field()
# expected time to play the audio
time: datetime = dataclasses.field()
rate: int = dataclasses.field()
# audio details
channels: int = dataclasses.field()
encoding: int = dataclasses.field()
sample_rate: int = dataclasses.field()
sample_width: int = dataclasses.field()
# raw audio data
_data: bytes = dataclasses.field(repr=False)
# is the audio zlib compressed
compressed: bool = dataclasses.field(default=False)
def pack(self) -> bytes:
return msgpack.packb((
self.data,
self.rate,
self.time.timestamp(),
self.channels,
self.encoding
self.sample_rate,
self.sample_width,
self._data,
self.compressed
))
def __post_init__(self):
# if the audio is not compressed, compress it
if not self.compressed:
self._data = zlib.compress(self._data)
self.compressed = True
@property
def data(self):
return zlib.decompress(self._data) if self.compressed else self._data
@classmethod
def unpack(cls, data: bytes):
return cls(*msgpack.unpackb(data))
# unpack the message
timestamp, channels, sample_rate, sample_width, data, compressed = msgpack.unpackb(data)
return cls(
datetime.fromtimestamp(timestamp),
channels,
sample_rate,
sample_width,
data,
compressed,
)

View file

View file

@ -0,0 +1,19 @@
import numpy
def sample_width_to_type(sample_width: int):
"""
Return the numpy type to use depending on the sample width used in an audio sample
:param sample_width: the sample width
:return: the corresponding numpy type
"""
match sample_width:
case 1:
return numpy.int8
case 2:
return numpy.int16
case 4:
return numpy.int32
case _:
return numpy.int16

View file

@ -1,3 +1,5 @@
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@ -24,7 +26,6 @@ def aes_ecb_encrypt(data: bytes, key: bytes) -> bytes:
return encrypted_data
def aes_ecb_decrypt(encrypted_data: bytes, key: bytes) -> bytes:
"""
Decrypt data encrypted with AES in CBC mode.
@ -45,3 +46,54 @@ def aes_ecb_decrypt(encrypted_data: bytes, key: bytes) -> bytes:
data = unpadder.update(decrypted_data) + unpadder.finalize()
return data
def aes_cbc_encrypt(data: bytes, key: bytes) -> bytes:
"""
Encrypt the message using AES in CBC mode.
:param data: the data to cipher
:param key: the key to use for the cipher
:return: the encrypted data
"""
# pad the data with PKCS7 for AES to work properly
padder = padding.PKCS7(128).padder()
padded_data = padder.update(data) + padder.finalize()
# create an initialisation vector
iv = os.urandom(16)
# create the AES cipher in CBC mode
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
# encrypt the padded data
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
# prepend the iv to the encrypted data
return iv + encrypted_data
def aes_cbc_decrypt(payload: bytes, key: bytes) -> bytes:
"""
Decrypt data encrypted with AES in CBC mode.
:param payload: the encrypted data
:param key: the key used to encrypt it
:return: the decrypted data
"""
# split the payload into the iv and the encrypted data
iv = payload[:16]
encrypted_data = payload[16:]
# create the AES cipher in CBC mode
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
# decrypt the encrypted data
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
# unpad the data
unpadder = padding.PKCS7(128).unpadder()
data = unpadder.update(decrypted_data) + unpadder.finalize()
return data

View file

@ -4,14 +4,16 @@ import typing
class CipherType(enum.Enum):
PLAIN = 0x00
AES_ECB = 0x01
RSA = 0x02
AES_ECB = 0x01 # legacy
AES_CBC = 0x02
RSA = 0x10
CIPHER_SYMMETRIC_TYPES: typing.Final[list[CipherType]] = [
CipherType.PLAIN,
CipherType.AES_ECB
CipherType.AES_ECB,
CipherType.AES_CBC,
]
CIPHER_ASYMMETRIC_TYPES: typing.Final[list[CipherType]] = [
CipherType.RSA
CipherType.RSA,
]