M2-PT-DRP/source/packets/AudioPacket.py

62 lines
1.5 KiB
Python

import dataclasses
import zlib
from datetime import datetime
import msgpack
from source.packets import base
@dataclasses.dataclass
class AudioPacket(base.BasePacket):
"""
Represent a packet of audio data
"""
# expected time to play the audio
time: datetime = dataclasses.field()
# audio details
channels: int = dataclasses.field()
sample_rate: int = dataclasses.field()
sample_width: int = dataclasses.field()
# raw audio data
_data: bytes = dataclasses.field(repr=False)
# is the audio zlib compressed
compressed: bool = dataclasses.field(default=False)
def pack(self) -> bytes:
return msgpack.packb((
self.time.timestamp(),
self.channels,
self.sample_rate,
self.sample_width,
self._data,
self.compressed
))
def __post_init__(self):
# if the audio is not compressed, compress it
if not self.compressed:
self._data = zlib.compress(self._data)
self.compressed = True
@property
def data(self):
return zlib.decompress(self._data) if self.compressed else self._data
@classmethod
def unpack(cls, data: bytes):
# unpack the message
timestamp, channels, sample_rate, sample_width, data, compressed = msgpack.unpackb(data)
return cls(
datetime.fromtimestamp(timestamp),
channels,
sample_rate,
sample_width,
data,
compressed,
)