import threading import typing from datetime import datetime import numpy import pause import sortedcontainers import sounddevice from source import packets, managers from source.utils.audio.audio import sample_width_to_type class AudioManager: """ Manage playing audio data in the buffer """ def __init__(self, manager: "managers.Manager"): self.stream: typing.Optional[sounddevice.OutputStream] = None # buffer containing the set of audio chunk to play. Sort them by their time to play self.buffer = sortedcontainers.SortedList(key=lambda audio: audio.time) # thread support self.lock = threading.Lock() self.new_audio_event = threading.Event() # event triggered when a new audio have been added def add_audio(self, audio: packets.AudioPacket) -> None: """ Add a new audio chunk to play :param audio: the audio chunk to play """ with self.lock: # add the audio packet to the buffer self.buffer.add(audio) # trigger the new audio event self.new_audio_event.set() def play_audio(self, audio: packets.AudioPacket) -> None: # create a numpy array for our sample sample = numpy.frombuffer(audio.data, dtype=sample_width_to_type(audio.sample_width)) # reshape it to have a sub-array for each channels sample = sample.reshape((-1, audio.channels)) # normalize the sample to be between -1 and 1 sample = sample / (2 ** (audio.sample_width * 8 - 1)) # use float32 for the audio library sample = sample.astype(numpy.float32) # wait for the audio given time pause.until(audio.time) # update the stream if the audio use different settings if ( self.stream is None or self.stream.samplerate != audio.sample_rate or self.stream.channels != audio.channels ): self.stream = sounddevice.OutputStream( samplerate=audio.sample_rate, channels=audio.channels, ) # play self.stream.start() # write the audio to the stream self.stream.write(sample) def handle(self) -> None: """ Play the audio chunk in the buffer at the given time """ # wait for a new audio packet # TODO(Faraphel): use self.lock ? seem to softlock the application if len(self.buffer) == 0: self.new_audio_event.clear() self.new_audio_event.wait() # get the most recent audio packet to play audio: packets.AudioPacket = self.buffer.pop(0) # if the audio should have been played before, skip it if audio.time < datetime.now(): return # play the audio packet self.play_audio(audio) def loop(self) -> None: """ Handle forever """ while True: self.handle()