M2-PT-DRP/source/behaviors/roles/MasterRole.py

94 lines
3.5 KiB
Python

import os
from datetime import datetime, timedelta
import numpy
import pause
import pydub
from pydub.utils import make_chunks
from source.behaviors.roles import base
from source.managers import Manager
from source.packets import AudioPacket
from source.utils.crypto.type import CipherType
class MasterRole(base.BaseActiveRole):
"""
Role used when the machine is declared as the master.
It will be the machine responsible for emitting data that the others peers should play together.
"""
TARGET_SIZE: int = 60 * 1024 # set an upper bound because of the IPv6 maximum packet size.
def __init__(self, manager: "Manager"):
super().__init__(manager)
# generate a random secret key for symmetric communication
self.secret_key = os.urandom(32)
# prepare the audio file that will be streamed
# TODO(Faraphel): use another audio source
self.audio = pydub.AudioSegment.from_file("./assets/Roundabout 2008 Remaster.mp3")
self.play_time = datetime.now()
# calculate the number of bytes per milliseconds in the audio
bytes_per_ms = self.audio.frame_rate * self.audio.sample_width * self.audio.channels / 1000
# calculate the required chunk duration to reach that size
self.chunk_duration = timedelta(milliseconds=self.TARGET_SIZE / bytes_per_ms)
# latency of the audio data
self.latency: timedelta = timedelta(seconds=0)
# split the audio into chunks
self.chunk_count = 0
self.chunks = make_chunks(self.audio, self.chunk_duration.total_seconds() * 1000)
def handle(self) -> None:
# get all the master peers in the network
master_peers = {
address: peer
for (address, peer) in self.manager.peer.peers.items()
if peer.master
}
# if there is more than one master, return to undefined
if len(master_peers) > 1:
# declare ourselves as the master of the network
from source.behaviors.roles import UndefinedRole
self.manager.role.current = UndefinedRole(self.manager)
return
# get the current chunk (loop indefinitely)
chunk = self.chunks[self.chunk_count % len(self.chunks)]
# calculate the maximum latency of the network
latencies: list[timedelta] = list(filter(
lambda latency: latency is not None,
map(
lambda peer: peer.latency,
self.manager.peer.peers.values())
)
)
# if there is not enough data, default to 5 seconds of latency
latency = timedelta(seconds=1) if len(latencies) <= 1 else (numpy.max(latencies) + timedelta(seconds=0.5))
# if the calculated latency is higher than the current one, update it
if latency > self.latency:
self.latency = latency
# broadcast it in the network
audio_packet = AudioPacket(
# TODO(Faraphel): adjust time depending on the network average latency ?
datetime.now() + self.latency, # play it in some seconds to let all the machines get the sample
chunk.channels,
chunk.frame_rate,
chunk.sample_width,
chunk.raw_data,
)
self.manager.communication.broadcast(audio_packet, CipherType.AES_CBC)
# increment the chunk count
self.chunk_count += 1
# wait for the next chunk time
pause.until(self.play_time + (self.chunk_duration * self.chunk_count))