import os from datetime import datetime, timedelta import numpy import pause import pydub from pydub.utils import make_chunks from source.behaviors.roles import base from source.managers import Manager from source.packets import AudioPacket from source.utils.crypto.type import CipherType class MasterRole(base.BaseActiveRole): """ Role used when the machine is declared as the master. It will be the machine responsible for emitting data that the others peers should play together. """ TARGET_SIZE: int = 60 * 1024 # set an upper bound because of the IPv6 maximum packet size. def __init__(self, manager: "Manager"): super().__init__(manager) # generate a random secret key for symmetric communication self.secret_key = os.urandom(32) # prepare the audio file that will be streamed # TODO(Faraphel): use another audio source self.audio = pydub.AudioSegment.from_file("./assets/Queen - Another One Bites the Dust.mp3") self.play_time = datetime.now() # calculate the number of bytes per milliseconds in the audio bytes_per_ms = self.audio.frame_rate * self.audio.sample_width * self.audio.channels / 1000 # calculate the required chunk duration to reach that size self.chunk_duration = timedelta(milliseconds=self.TARGET_SIZE / bytes_per_ms) # latency of the audio data self.latency: timedelta = timedelta(seconds=0) # split the audio into chunks self.chunk_count = 0 self.chunks = make_chunks(self.audio, self.chunk_duration.total_seconds() * 1000) def handle(self) -> None: # get all the master peers in the network master_peers = { address: peer for (address, peer) in self.manager.peer.peers.items() if peer.master } # if there is more than one master, return to undefined if len(master_peers) > 1: # declare ourselves as the master of the network from source.behaviors.roles import UndefinedRole self.manager.role.current = UndefinedRole(self.manager) return # get the current chunk (loop indefinitely) chunk = self.chunks[self.chunk_count % len(self.chunks)] # calculate the maximum latency of the network latencies: list[timedelta] = list(filter( lambda latency: latency is not None, map( lambda peer: peer.latency, self.manager.peer.peers.values()) ) ) # if there is not enough data, default to 5 seconds of latency latency = timedelta(seconds=1) if len(latencies) <= 1 else (numpy.max(latencies) + timedelta(seconds=0.5)) # if the calculated latency is higher than the current one, update it if latency > self.latency: self.latency = latency # broadcast it in the network audio_packet = AudioPacket( # TODO(Faraphel): adjust time depending on the network average latency ? datetime.now() + self.latency, # play it in some seconds to let all the machines get the sample chunk.channels, chunk.frame_rate, chunk.sample_width, chunk.raw_data, ) self.manager.communication.broadcast(audio_packet, CipherType.AES_CBC) # increment the chunk count self.chunk_count += 1 # wait for the next chunk time pause.until(self.play_time + (self.chunk_duration * self.chunk_count))