#include "Client.hpp" #include #include #include #include #include #include #include #include #include #include "packets/AudioPacket.hpp" Client::Client() { this->stream = nullptr; this->audioLock = std::unique_lock(this->audioMutex); this->streamChannels = 0; this->streamSampleFormat = 0; this->streamRate = 0; } void Client::updateStream(const int channels, const std::uint32_t sampleFormat, const double sampleRate) { // check if any information changed. If no, ignore this if ( this->streamChannels == channels && this->streamSampleFormat == sampleFormat && this->streamRate == sampleRate ) return; // close the current stream // ignore errors that could happen if no audio is currently playing Pa_CloseStream(&this->stream); // open a new stream with the new settings if (const PaError error = Pa_OpenDefaultStream( &this->stream, 0, channels, sampleFormat, sampleRate, paFramesPerBufferUnspecified, nullptr, nullptr ) != paNoError) throw std::runtime_error("[Client] Could not open the stream: " + std::string(Pa_GetErrorText(error))); // update the new audio values this->streamChannels = channels; this->streamSampleFormat = sampleFormat; this->streamRate = sampleRate; } Client::~Client() { // stop any currently playing audio Pa_StopStream(this->stream); // close the audio stream if (const PaError error = Pa_CloseStream(this->stream)) std::cerr << "[Client] Could not close the stream: " << std::string(Pa_GetErrorText(error)) << std::endl; } void Client::loop() { // run an audio receiver alongside an audio player this->receiverThread = std::thread(&Client::loopReceiver, this); this->playerThread = std::thread(&Client::loopPlayer, this); this->receiverThread.join(); this->playerThread.join(); } void Client::loopReceiver() { // create the socket const int clientSocket = socket( AF_INET6, SOCK_DGRAM, 0 ); if (clientSocket < 0) throw std::runtime_error("[Client] Could not create the socket: " + std::string(gai_strerror(errno))); // get the broadcast address addrinfo serverHints = {}; serverHints.ai_family = AF_INET6; serverHints.ai_socktype = SOCK_DGRAM; serverHints.ai_protocol = IPPROTO_UDP; // TODO(Faraphel): port as argument addrinfo *serverInfo; if(getaddrinfo( nullptr, // hostname "5650", // our port &serverHints, &serverInfo ) != 0) throw std::runtime_error("[Client] Could not get the address: " + std::string(gai_strerror(errno))); // bind the socket to the address if (bind( clientSocket, serverInfo->ai_addr, serverInfo->ai_addrlen ) < 0) throw std::runtime_error("[Client] Could not bind to the address: " + std::string(gai_strerror(errno))); // free the server address freeaddrinfo(serverInfo); // prepare space for the server address sockaddr_storage serverAddress {}; socklen_t serverAddressLength; // prepare space for the received audio AudioPacket audioPacket; // receive new audio data while (true) { // receive new audio data const ssize_t size = recvfrom( clientSocket, &audioPacket, sizeof(audioPacket), 0, reinterpret_cast(&serverAddress), &serverAddressLength ); if (size == -1) { std::cerr << "[Client] Could not receive from the socket: " << gai_strerror(errno) << std::endl; continue; } // save the audio data into the queue for the player std::cout << "[Client] Received: " << size << " bytes" << std::endl; this->audioQueue.push(audioPacket); // notify that a new audio chunk is available this->audioCondition.notify_one(); } } void Client::loopPlayer() { while (true) { // wait for a new element in the audio queue this->audioCondition.wait( this->audioLock, [this] { return !this->audioQueue.empty(); } ); // get the most recent audio chunk const auto audioPacket = this->audioQueue.top(); // update the stream with the new audio settings this->updateStream( audioPacket.channels, audioPacket.sampleFormat, audioPacket.sampleRate ); // wait until it must be played std::this_thread::sleep_until(audioPacket.timePlay); std::cout << "[Client] Playing: " << audioPacket.timePlay << std::endl; // immediately stop playing music // this avoids an offset created if this client's clock is too ahead of the others // don't handle errors since audio might not be playing before Pa_AbortStream(this->stream); // play the new audio data if (const int error = Pa_StartStream(this->stream) != paNoError) throw std::runtime_error("[Client] Could not start the PortAudio stream: " + std::string(Pa_GetErrorText(error))); // write the new audio data into the audio buffer // TODO(Faraphel) / 2 => / encoding size // TODO(Faraphel): the number of frames could be improved const int error = Pa_WriteStream( this->stream, audioPacket.content.data(), audioPacket.contentSize / 2 / this->streamChannels ); switch (error) { // success case paNoError: // the output might be very slightly underflowed, // causing a very small period where no noise will be played. case paOutputUnderflowed: break; default: std::cerr << "[Client] Could not write to the audio stream: " << Pa_GetErrorText(error) << std::endl; } // remove the audio chunk this->audioQueue.pop(); } }