diff --git a/source/Client.cpp b/source/Client.cpp index 6678b80..d15e815 100644 --- a/source/Client.cpp +++ b/source/Client.cpp @@ -1,27 +1,21 @@ #include "Client.hpp" #include -#include #include #include #include #include #include #include -#include +#include #include #include "packets/AudioPacket.hpp" -struct AudioPacketsComparator { - bool operator() (const AudioPacket &a, const AudioPacket &b) const { - return a.timePlay > b.timePlay; - } -}; - - Client::Client(const int channels, const double rate) { + this->stream = nullptr; + this->audioLock = std::unique_lock(this->audioMutex); this->channels = channels; // TODO(Faraphel): make the sampleFormat an argument. @@ -46,13 +40,16 @@ Client::~Client() { } -void Client::loop() const { - int error; +void Client::loop() { + // run an audio receiver alongside an audio player + this->receiverThread = std::thread(&Client::loopReceiver, this); + this->playerThread = std::thread(&Client::loopPlayer, this); - // start the PortAudio stream - if (Pa_StartStream(stream) != paNoError) - throw std::runtime_error("Could not start the PortAudio stream."); + this->receiverThread.join(); + this->playerThread.join(); +} +void Client::loopReceiver() { // create the socket const int clientSocket = socket( AF_INET6, @@ -60,7 +57,7 @@ void Client::loop() const { 0 ); if (clientSocket < 0) - throw std::runtime_error("Could not create the socket."); + throw std::runtime_error("Could not create the socket." + std::string(gai_strerror(errno))); // get the broadcast address addrinfo serverHints = {}; @@ -70,15 +67,15 @@ void Client::loop() const { // TODO(Faraphel): port as argument addrinfo *serverInfo; - if((error = getaddrinfo( + if(getaddrinfo( nullptr, // hostname "5650", // our port &serverHints, &serverInfo - )) != 0) - throw std::runtime_error("Could not get the address.\n" + std::string(gai_strerror(error))); + ) != 0) + throw std::runtime_error("Could not get the address.\n" + std::string(gai_strerror(errno))); - // bind to this address + // bind the socket to the address if (bind( clientSocket, serverInfo->ai_addr, @@ -89,55 +86,78 @@ void Client::loop() const { // free the server address freeaddrinfo(serverInfo); + // prepare space for the server address sockaddr_storage serverAddress {}; socklen_t serverAddressLength; - - AudioPacket audioPacketReceived; - std::priority_queue, AudioPacketsComparator> audioQueue; + // prepare space for the received audio + AudioPacket audioPacket; // receive new audio data while (true) { + // receive new audio data const ssize_t size = recvfrom( clientSocket, - &audioPacketReceived, - sizeof(audioPacketReceived), + &audioPacket, + sizeof(audioPacket), 0, reinterpret_cast(&serverAddress), &serverAddressLength ); if (size == -1) { - std::cout << "Could not receive from the socket : " << gai_strerror(errno) << std::endl; + std::cerr << "Could not receive from the socket : " << gai_strerror(errno) << std::endl; continue; } - std::cout << "[Client] received: " << size << " bytes" << std::endl; - audioQueue.push(audioPacketReceived); - - // TODO(Faraphel): the receiver and the player shall be in two different thread ! - // the music can't play nicely if not done. - while (!audioQueue.empty()) { - const auto& audioPacket = audioQueue.top(); - if (audioPacket.timePlay <= std::chrono::high_resolution_clock::now()) { - std::cout << "playing: " << audioPacket.timePlay << std::endl; - - // TODO(Faraphel): check for the error ? - // TODO(Faraphel) / 2 => / encoding size - // TODO(Faraphel): the number of frames could be improved - // TODO(Faraphel): more comments - // play the audio buffer - error = Pa_WriteStream( - this->stream, - audioPacket.content.data(), - audioPacket.contentSize / 2 / this->channels - ); - - if (error != paNoError) - std::cerr << "Could not write to the stream : " << Pa_GetErrorText(error) << std::endl; - - audioQueue.pop(); - } - else - break; - } + // save the audio data into the queue for the player + std::cout << "[Client] received : " << size << " bytes" << std::endl; + this->audioQueue.push(audioPacket); + // notify that a new audio chunk is available + this->audioCondition.notify_one(); + } +} + +void Client::loopPlayer() { + // start the PortAudio stream + if (Pa_StartStream(this->stream) != paNoError) + throw std::runtime_error("Could not start the PortAudio stream."); + + while (true) { + // wait for a new element in the audio queue + this->audioCondition.wait( + this->audioLock, + [this] { return !this->audioQueue.empty(); } + ); + // get the most recent audio chunk + const auto audioPacket = audioQueue.top(); + // wait until it must be played + std::this_thread::sleep_until(audioPacket.timePlay); + + // TODO(Faraphel): check for the error ? + // TODO(Faraphel) / 2 => / encoding size + // TODO(Faraphel): the number of frames could be improved + // TODO(Faraphel): more comments + + std::cout << "playing: " << audioPacket.timePlay << std::endl; + + // play the audio buffer + const int error = Pa_WriteStream( + this->stream, + audioPacket.content.data(), + audioPacket.contentSize / 2 / this->channels + ); + switch (error) { + // success + case paNoError: + // the output might be very slightly underflowed, + // causing a very small period where no noise will be played. + case paOutputUnderflowed: + break; + + default: + std::cerr << "Could not write to the PortAudio stream." << Pa_GetErrorText(error) << std::endl; + } + + // remove the audio chunk + this->audioQueue.pop(); } } diff --git a/source/Client.hpp b/source/Client.hpp index ebf3e93..24925bf 100644 --- a/source/Client.hpp +++ b/source/Client.hpp @@ -1,7 +1,21 @@ #pragma once +#include +#include #include +#include +#include + +#include "packets/AudioPacket.hpp" + + +// TODO(Faraphel): should be moved somewhere else +struct AudioPacketsComparator { + bool operator() (const AudioPacket &a, const AudioPacket &b) const { + return a.timePlay > b.timePlay; + } +}; class Client { @@ -9,9 +23,19 @@ public: explicit Client(int channels, double rate); ~Client(); - void loop() const; + void loop(); private: - PaStream* stream = nullptr; + void loopReceiver(); + void loopPlayer(); + + PaStream* stream; int channels; + std::priority_queue, AudioPacketsComparator> audioQueue; + + std::mutex audioMutex; + std::unique_lock audioLock; + std::condition_variable audioCondition; + std::thread receiverThread; + std::thread playerThread; }; diff --git a/source/Server.hpp b/source/Server.hpp index df1e28e..5f80248 100644 --- a/source/Server.hpp +++ b/source/Server.hpp @@ -6,6 +6,7 @@ class Server { public: explicit Server(); ~Server(); + void loop(); long getRate() const; diff --git a/source/main.cpp b/source/main.cpp index 78c32f4..84b8a3e 100644 --- a/source/main.cpp +++ b/source/main.cpp @@ -8,6 +8,8 @@ int main(int argc, char* argv[]) { + // TODO(Faraphel): move in the Client + // initialize the mpg123 library if (mpg123_init() != MPG123_OK) throw std::runtime_error("Error while initializing mpg123."); @@ -18,13 +20,10 @@ int main(int argc, char* argv[]) { // start the client and server Server server; - Client client ( - server.getChannels(), - static_cast(server.getRate()) - ); + Client client(server.getChannels(), static_cast(server.getRate())); - std::thread serverThread(&Server::loop, server); - std::thread clientThread(&Client::loop, client); + std::thread serverThread(&Server::loop, &server); + std::thread clientThread(&Client::loop, &client); serverThread.join(); clientThread.join();