M2-PT-DRP/source/Client.cpp

194 lines
5.9 KiB
C++

#include "Client.hpp"
#include <cstring>
#include <iostream>
#include <list>
#include <map>
#include <netdb.h>
#include <queue>
#include <stdexcept>
#include <thread>
#include <sys/socket.h>
#include "packets/AudioPacket.hpp"
Client::Client() {
this->stream = nullptr;
this->audioLock = std::unique_lock(this->audioMutex);
this->streamChannels = 0;
this->streamSampleFormat = 0;
this->streamRate = 0;
}
void Client::updateStream(const int channels, const std::uint32_t sampleFormat, const double sampleRate) {
// check if any information changed. If no, ignore this
if (
this->streamChannels == channels &&
this->streamSampleFormat == sampleFormat &&
this->streamRate == sampleRate
)
return;
// close the current stream
// ignore errors that could happen if no audio is currently playing
Pa_CloseStream(&this->stream);
// open a new stream with the new settings
if (const PaError error = Pa_OpenDefaultStream(
&this->stream,
0,
channels,
sampleFormat,
sampleRate,
paFramesPerBufferUnspecified,
nullptr,
nullptr
) != paNoError)
throw std::runtime_error("[Client] Could not open the stream: " + std::string(Pa_GetErrorText(error)));
// update the new audio values
this->streamChannels = channels;
this->streamSampleFormat = sampleFormat;
this->streamRate = sampleRate;
}
Client::~Client() {
// stop any currently playing audio
Pa_StopStream(this->stream);
// close the audio stream
if (const PaError error = Pa_CloseStream(this->stream))
std::cerr << "[Client] Could not close the stream: " << std::string(Pa_GetErrorText(error)) << std::endl;
}
void Client::loop() {
// run an audio receiver alongside an audio player
this->receiverThread = std::thread(&Client::loopReceiver, this);
this->playerThread = std::thread(&Client::loopPlayer, this);
this->receiverThread.join();
this->playerThread.join();
}
void Client::loopReceiver() {
// create the socket
const int clientSocket = socket(
AF_INET6,
SOCK_DGRAM,
0
);
if (clientSocket < 0)
throw std::runtime_error("[Client] Could not create the socket: " + std::string(gai_strerror(errno)));
// get the broadcast address
addrinfo serverHints = {};
serverHints.ai_family = AF_INET6;
serverHints.ai_socktype = SOCK_DGRAM;
serverHints.ai_protocol = IPPROTO_UDP;
// TODO(Faraphel): port as argument
addrinfo *serverInfo;
if(getaddrinfo(
nullptr, // hostname
"5650", // our port
&serverHints,
&serverInfo
) != 0)
throw std::runtime_error("[Client] Could not get the address: " + std::string(gai_strerror(errno)));
// bind the socket to the address
if (bind(
clientSocket,
serverInfo->ai_addr,
serverInfo->ai_addrlen
) < 0)
throw std::runtime_error("[Client] Could not bind to the address: " + std::string(gai_strerror(errno)));
// free the server address
freeaddrinfo(serverInfo);
// prepare space for the server address
sockaddr_storage serverAddress {};
socklen_t serverAddressLength;
// prepare space for the received audio
AudioPacket audioPacket;
// receive new audio data
while (true) {
// receive new audio data
const ssize_t size = recvfrom(
clientSocket,
&audioPacket,
sizeof(audioPacket),
0,
reinterpret_cast<sockaddr *>(&serverAddress),
&serverAddressLength
);
if (size == -1) {
std::cerr << "[Client] Could not receive from the socket: " << gai_strerror(errno) << std::endl;
continue;
}
// save the audio data into the queue for the player
std::cout << "[Client] Received: " << size << " bytes" << std::endl;
this->audioQueue.push(audioPacket);
// notify that a new audio chunk is available
this->audioCondition.notify_one();
}
}
void Client::loopPlayer() {
while (true) {
// wait for a new element in the audio queue
this->audioCondition.wait(
this->audioLock,
[this] { return !this->audioQueue.empty(); }
);
// get the most recent audio chunk
const auto audioPacket = this->audioQueue.top();
// update the stream with the new audio settings
this->updateStream(
audioPacket.channels,
audioPacket.sampleFormat,
audioPacket.sampleRate
);
// wait until it must be played
std::this_thread::sleep_until(audioPacket.timePlay);
std::cout << "[Client] Playing: " << audioPacket.timePlay << std::endl;
// immediately stop playing music
// this avoids an offset created if this client's clock is too ahead of the others
// don't handle errors since audio might not be playing before
Pa_AbortStream(this->stream);
// play the new audio data
if (const int error = Pa_StartStream(this->stream) != paNoError)
throw std::runtime_error("[Client] Could not start the PortAudio stream: " + std::string(Pa_GetErrorText(error)));
// write the new audio data into the audio buffer
const int error = Pa_WriteStream(
this->stream,
audioPacket.content.data(),
audioPacket.contentSize / Pa_GetSampleSize(this->streamSampleFormat) / this->streamChannels
);
switch (error) {
// success
case paNoError:
// the output might be very slightly underflowed,
// causing a very small period where no noise will be played.
case paOutputUnderflowed:
break;
default:
std::cerr << "[Client] Could not write to the audio stream: " << Pa_GetErrorText(error) << std::endl;
}
// remove the audio chunk
this->audioQueue.pop();
}
}