splitted the client player and receiver into two distinct threads

This commit is contained in:
faraphel 2024-10-21 22:05:19 +02:00
parent ced6dea00e
commit b8edda302d
4 changed files with 106 additions and 62 deletions

View file

@ -1,27 +1,21 @@
#include "Client.hpp" #include "Client.hpp"
#include <cstring> #include <cstring>
#include <cstdint>
#include <iostream> #include <iostream>
#include <list> #include <list>
#include <map> #include <map>
#include <netdb.h> #include <netdb.h>
#include <queue> #include <queue>
#include <stdexcept> #include <stdexcept>
#include <vector> #include <thread>
#include <sys/socket.h> #include <sys/socket.h>
#include "packets/AudioPacket.hpp" #include "packets/AudioPacket.hpp"
struct AudioPacketsComparator {
bool operator() (const AudioPacket &a, const AudioPacket &b) const {
return a.timePlay > b.timePlay;
}
};
Client::Client(const int channels, const double rate) { Client::Client(const int channels, const double rate) {
this->stream = nullptr;
this->audioLock = std::unique_lock(this->audioMutex);
this->channels = channels; this->channels = channels;
// TODO(Faraphel): make the sampleFormat an argument. // TODO(Faraphel): make the sampleFormat an argument.
@ -46,13 +40,16 @@ Client::~Client() {
} }
void Client::loop() const { void Client::loop() {
int error; // run an audio receiver alongside an audio player
this->receiverThread = std::thread(&Client::loopReceiver, this);
this->playerThread = std::thread(&Client::loopPlayer, this);
// start the PortAudio stream this->receiverThread.join();
if (Pa_StartStream(stream) != paNoError) this->playerThread.join();
throw std::runtime_error("Could not start the PortAudio stream."); }
void Client::loopReceiver() {
// create the socket // create the socket
const int clientSocket = socket( const int clientSocket = socket(
AF_INET6, AF_INET6,
@ -60,7 +57,7 @@ void Client::loop() const {
0 0
); );
if (clientSocket < 0) if (clientSocket < 0)
throw std::runtime_error("Could not create the socket."); throw std::runtime_error("Could not create the socket." + std::string(gai_strerror(errno)));
// get the broadcast address // get the broadcast address
addrinfo serverHints = {}; addrinfo serverHints = {};
@ -70,15 +67,15 @@ void Client::loop() const {
// TODO(Faraphel): port as argument // TODO(Faraphel): port as argument
addrinfo *serverInfo; addrinfo *serverInfo;
if((error = getaddrinfo( if(getaddrinfo(
nullptr, // hostname nullptr, // hostname
"5650", // our port "5650", // our port
&serverHints, &serverHints,
&serverInfo &serverInfo
)) != 0) ) != 0)
throw std::runtime_error("Could not get the address.\n" + std::string(gai_strerror(error))); throw std::runtime_error("Could not get the address.\n" + std::string(gai_strerror(errno)));
// bind to this address // bind the socket to the address
if (bind( if (bind(
clientSocket, clientSocket,
serverInfo->ai_addr, serverInfo->ai_addr,
@ -89,55 +86,78 @@ void Client::loop() const {
// free the server address // free the server address
freeaddrinfo(serverInfo); freeaddrinfo(serverInfo);
// prepare space for the server address
sockaddr_storage serverAddress {}; sockaddr_storage serverAddress {};
socklen_t serverAddressLength; socklen_t serverAddressLength;
// prepare space for the received audio
AudioPacket audioPacketReceived; AudioPacket audioPacket;
std::priority_queue<AudioPacket, std::vector<AudioPacket>, AudioPacketsComparator> audioQueue;
// receive new audio data // receive new audio data
while (true) { while (true) {
// receive new audio data
const ssize_t size = recvfrom( const ssize_t size = recvfrom(
clientSocket, clientSocket,
&audioPacketReceived, &audioPacket,
sizeof(audioPacketReceived), sizeof(audioPacket),
0, 0,
reinterpret_cast<sockaddr *>(&serverAddress), reinterpret_cast<sockaddr *>(&serverAddress),
&serverAddressLength &serverAddressLength
); );
if (size == -1) { if (size == -1) {
std::cout << "Could not receive from the socket : " << gai_strerror(errno) << std::endl; std::cerr << "Could not receive from the socket : " << gai_strerror(errno) << std::endl;
continue; continue;
} }
std::cout << "[Client] received: " << size << " bytes" << std::endl;
audioQueue.push(audioPacketReceived); // save the audio data into the queue for the player
std::cout << "[Client] received : " << size << " bytes" << std::endl;
// TODO(Faraphel): the receiver and the player shall be in two different thread ! this->audioQueue.push(audioPacket);
// the music can't play nicely if not done. // notify that a new audio chunk is available
while (!audioQueue.empty()) { this->audioCondition.notify_one();
const auto& audioPacket = audioQueue.top(); }
if (audioPacket.timePlay <= std::chrono::high_resolution_clock::now()) { }
std::cout << "playing: " << audioPacket.timePlay << std::endl;
void Client::loopPlayer() {
// TODO(Faraphel): check for the error ? // start the PortAudio stream
// TODO(Faraphel) / 2 => / encoding size if (Pa_StartStream(this->stream) != paNoError)
// TODO(Faraphel): the number of frames could be improved throw std::runtime_error("Could not start the PortAudio stream.");
// TODO(Faraphel): more comments
// play the audio buffer while (true) {
error = Pa_WriteStream( // wait for a new element in the audio queue
this->stream, this->audioCondition.wait(
audioPacket.content.data(), this->audioLock,
audioPacket.contentSize / 2 / this->channels [this] { return !this->audioQueue.empty(); }
); );
// get the most recent audio chunk
if (error != paNoError) const auto audioPacket = audioQueue.top();
std::cerr << "Could not write to the stream : " << Pa_GetErrorText(error) << std::endl; // wait until it must be played
std::this_thread::sleep_until(audioPacket.timePlay);
audioQueue.pop();
} // TODO(Faraphel): check for the error ?
else // TODO(Faraphel) / 2 => / encoding size
break; // TODO(Faraphel): the number of frames could be improved
} // TODO(Faraphel): more comments
std::cout << "playing: " << audioPacket.timePlay << std::endl;
// play the audio buffer
const int error = Pa_WriteStream(
this->stream,
audioPacket.content.data(),
audioPacket.contentSize / 2 / this->channels
);
switch (error) {
// success
case paNoError:
// the output might be very slightly underflowed,
// causing a very small period where no noise will be played.
case paOutputUnderflowed:
break;
default:
std::cerr << "Could not write to the PortAudio stream." << Pa_GetErrorText(error) << std::endl;
}
// remove the audio chunk
this->audioQueue.pop();
} }
} }

View file

@ -1,7 +1,21 @@
#pragma once #pragma once
#include <condition_variable>
#include <mutex>
#include <portaudio.h> #include <portaudio.h>
#include <queue>
#include <thread>
#include "packets/AudioPacket.hpp"
// TODO(Faraphel): should be moved somewhere else
struct AudioPacketsComparator {
bool operator() (const AudioPacket &a, const AudioPacket &b) const {
return a.timePlay > b.timePlay;
}
};
class Client { class Client {
@ -9,9 +23,19 @@ public:
explicit Client(int channels, double rate); explicit Client(int channels, double rate);
~Client(); ~Client();
void loop() const; void loop();
private: private:
PaStream* stream = nullptr; void loopReceiver();
void loopPlayer();
PaStream* stream;
int channels; int channels;
std::priority_queue<AudioPacket, std::vector<AudioPacket>, AudioPacketsComparator> audioQueue;
std::mutex audioMutex;
std::unique_lock<std::mutex> audioLock;
std::condition_variable audioCondition;
std::thread receiverThread;
std::thread playerThread;
}; };

View file

@ -6,6 +6,7 @@ class Server {
public: public:
explicit Server(); explicit Server();
~Server(); ~Server();
void loop(); void loop();
long getRate() const; long getRate() const;

View file

@ -8,6 +8,8 @@
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
// TODO(Faraphel): move in the Client
// initialize the mpg123 library // initialize the mpg123 library
if (mpg123_init() != MPG123_OK) if (mpg123_init() != MPG123_OK)
throw std::runtime_error("Error while initializing mpg123."); throw std::runtime_error("Error while initializing mpg123.");
@ -18,13 +20,10 @@ int main(int argc, char* argv[]) {
// start the client and server // start the client and server
Server server; Server server;
Client client ( Client client(server.getChannels(), static_cast<double>(server.getRate()));
server.getChannels(),
static_cast<double>(server.getRate())
);
std::thread serverThread(&Server::loop, server); std::thread serverThread(&Server::loop, &server);
std::thread clientThread(&Client::loop, client); std::thread clientThread(&Client::loop, &client);
serverThread.join(); serverThread.join();
clientThread.join(); clientThread.join();