M2-PT-DRP/source/managers/AudioManager.py
study-faraphel 9480339b89 improved error handling
instead of having a component crashing, it will issue a warning with the error
2025-02-02 23:31:06 +01:00

105 lines
No EOL
3.1 KiB
Python

import threading
import traceback
import typing
import warnings
from datetime import datetime
import numpy
import pause
import sortedcontainers
import sounddevice
from source import packets, managers
from source.utils.audio.audio import sample_width_to_type
class AudioManager:
"""
Manage playing audio data in the buffer
"""
def __init__(self, manager: "managers.Manager"):
self.stream: typing.Optional[sounddevice.OutputStream] = None
# buffer containing the set of audio chunk to play. Sort them by their time to play
self.buffer = sortedcontainers.SortedList(key=lambda audio: audio.time)
# thread support
self.lock = threading.Lock()
self.new_audio_event = threading.Event() # event triggered when a new audio have been added
def add_audio(self, audio: packets.AudioPacket) -> None:
"""
Add a new audio chunk to play
:param audio: the audio chunk to play
"""
with self.lock:
# add the audio packet to the buffer
self.buffer.add(audio)
# trigger the new audio event
self.new_audio_event.set()
def play_audio(self, audio: packets.AudioPacket) -> None:
# create a numpy array for our sample
sample = numpy.frombuffer(audio.data, dtype=sample_width_to_type(audio.sample_width))
# reshape it to have a sub-array for each channels
sample = sample.reshape((-1, audio.channels))
# normalize the sample to be between -1 and 1
sample = sample / (2 ** (audio.sample_width * 8 - 1))
# use float32 for the audio library
sample = sample.astype(numpy.float32)
# wait for the audio given time
pause.until(audio.time)
# update the stream if the audio use different settings
if (
self.stream is None or
self.stream.samplerate != audio.sample_rate or
self.stream.channels != audio.channels
):
self.stream = sounddevice.OutputStream(
samplerate=audio.sample_rate,
channels=audio.channels,
)
# play
self.stream.start()
# write the audio to the stream
self.stream.write(sample)
def handle(self) -> None:
"""
Play the audio chunk in the buffer at the given time
"""
# wait for a new audio packet
# TODO(Faraphel): use self.lock ? seem to softlock the application
if len(self.buffer) == 0:
self.new_audio_event.clear()
self.new_audio_event.wait()
# get the most recent audio packet to play
audio: packets.AudioPacket = self.buffer.pop(0)
# if the audio should have been played before, skip it
if audio.time < datetime.now():
return
# play the audio packet
self.play_audio(audio)
def loop(self) -> None:
"""
Handle forever
"""
while True:
try:
self.handle()
except Exception: # NOQA
warnings.warn(traceback.format_exc())