diff --git a/CMakeLists.txt b/CMakeLists.txt index e74c799..47ab947 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,12 +12,12 @@ pkg_check_modules(PORTAUDIO REQUIRED portaudio-2.0) add_executable(M2-PT-DRP + source/main.cpp source/Client.cpp source/Client.hpp source/Server.cpp source/Server.hpp - source/main.cpp - source/main.hpp + source/packets/AudioPacket.hpp ) target_include_directories(M2-PT-DRP PRIVATE ${MPG123_INCLUDE_DIRS} diff --git a/source/Client.cpp b/source/Client.cpp index 66ac9d4..6678b80 100644 --- a/source/Client.cpp +++ b/source/Client.cpp @@ -3,16 +3,28 @@ #include #include #include +#include +#include #include +#include #include #include #include +#include "packets/AudioPacket.hpp" + + +struct AudioPacketsComparator { + bool operator() (const AudioPacket &a, const AudioPacket &b) const { + return a.timePlay > b.timePlay; + } +}; + Client::Client(const int channels, const double rate) { this->channels = channels; - // TODO(Faraphel): make the sampleFormat and the framesPerBuffer arguments. + // TODO(Faraphel): make the sampleFormat an argument. // open a PortAudio stream if (Pa_OpenDefaultStream( &this->stream, @@ -20,7 +32,7 @@ Client::Client(const int channels, const double rate) { channels, paInt16, rate, - 512, + paFramesPerBufferUnspecified, nullptr, nullptr ) != paNoError) @@ -34,7 +46,7 @@ Client::~Client() { } -void Client::loop() { +void Client::loop() const { int error; // start the PortAudio stream @@ -59,7 +71,7 @@ void Client::loop() { // TODO(Faraphel): port as argument addrinfo *serverInfo; if((error = getaddrinfo( - nullptr, // any source + nullptr, // hostname "5650", // our port &serverHints, &serverInfo @@ -79,27 +91,53 @@ void Client::loop() { sockaddr_storage serverAddress {}; socklen_t serverAddressLength; - std::vector buffer(4096); + + AudioPacket audioPacketReceived; + std::priority_queue, AudioPacketsComparator> audioQueue; // receive new audio data - while (ssize_t size = recvfrom( - clientSocket, - buffer.data(), - buffer.size(), - 0, - reinterpret_cast(&serverAddress), - &serverAddressLength - )) { + while (true) { + const ssize_t size = recvfrom( + clientSocket, + &audioPacketReceived, + sizeof(audioPacketReceived), + 0, + reinterpret_cast(&serverAddress), + &serverAddressLength + ); + if (size == -1) { + std::cout << "Could not receive from the socket : " << gai_strerror(errno) << std::endl; + continue; + } std::cout << "[Client] received: " << size << " bytes" << std::endl; - // TODO(Faraphel): check for the error ? - error = Pa_WriteStream( - this->stream, - buffer.data(), - size / 2 / this->channels - ); + audioQueue.push(audioPacketReceived); - // play the audio buffer - // TODO(Faraphel): the number of frames could be improved + // TODO(Faraphel): the receiver and the player shall be in two different thread ! + // the music can't play nicely if not done. + while (!audioQueue.empty()) { + const auto& audioPacket = audioQueue.top(); + if (audioPacket.timePlay <= std::chrono::high_resolution_clock::now()) { + std::cout << "playing: " << audioPacket.timePlay << std::endl; + + // TODO(Faraphel): check for the error ? + // TODO(Faraphel) / 2 => / encoding size + // TODO(Faraphel): the number of frames could be improved + // TODO(Faraphel): more comments + // play the audio buffer + error = Pa_WriteStream( + this->stream, + audioPacket.content.data(), + audioPacket.contentSize / 2 / this->channels + ); + + if (error != paNoError) + std::cerr << "Could not write to the stream : " << Pa_GetErrorText(error) << std::endl; + + audioQueue.pop(); + } + else + break; + } } } diff --git a/source/Client.hpp b/source/Client.hpp index 34225b6..ebf3e93 100644 --- a/source/Client.hpp +++ b/source/Client.hpp @@ -9,7 +9,7 @@ public: explicit Client(int channels, double rate); ~Client(); - void loop(); + void loop() const; private: PaStream* stream = nullptr; diff --git a/source/Server.cpp b/source/Server.cpp index f747669..a19cedb 100644 --- a/source/Server.cpp +++ b/source/Server.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -9,6 +10,8 @@ #include #include +#include "packets/AudioPacket.hpp" + Server::Server() { // create a new mpg123 handle @@ -38,58 +41,62 @@ Server::~Server() { mpg123_delete(this->mpgHandle); } - void Server::loop() { int error; - // create the socket - const int serverSocket = socket( - AF_INET6, - SOCK_DGRAM, - 0 - ); - if (serverSocket < 0) - throw std::runtime_error("Could not create the socket."); - // get the broadcast address - addrinfo clientHints = {}; - clientHints.ai_family = AF_INET6; - clientHints.ai_socktype = SOCK_DGRAM; - clientHints.ai_protocol = IPPROTO_UDP; + addrinfo broadcastHints {}; + broadcastHints.ai_family = AF_INET6; + broadcastHints.ai_socktype = SOCK_DGRAM; + broadcastHints.ai_protocol = IPPROTO_UDP; // TODO(Faraphel): ip / port as argument - addrinfo *clientInfo; + addrinfo *broadcastInfo; if((error = getaddrinfo( - "localhost", - // "ff02::1", // IPv6 local broadcast - "5650", // our port - &clientHints, - &clientInfo + "::1", + "5650", + &broadcastHints, + &broadcastInfo )) != 0) throw std::runtime_error("Could not get the address.\n" + std::string(gai_strerror(error))); + const int broadcastSocket = socket( + broadcastInfo->ai_family, + broadcastInfo->ai_socktype, + broadcastInfo->ai_protocol + ); + if (broadcastSocket == -1) + throw std::runtime_error("Could not create the socket: " + std::string(gai_strerror(errno))); // read the file - std::vector buffer(4096); + AudioPacket audioPacket; std::size_t done; while (mpg123_read( this->mpgHandle, - buffer.data(), - buffer.size(), - &done) == MPG123_OK - ) { - // send the content of the file - sendto( - serverSocket, - buffer.data(), - buffer.size(), - 0, - clientInfo->ai_addr, - clientInfo->ai_addrlen - ); + &audioPacket.content, + std::size(audioPacket.content), + &done + ) == MPG123_OK) { + // set the target time + audioPacket.timePlay = + std::chrono::high_resolution_clock::now() + + std::chrono::milliseconds(5000); - std::cout << "[Server] sent : " << done << " bytes" << std::endl; + // set the size of the content + audioPacket.contentSize = done; + + if (sendto( + broadcastSocket, + &audioPacket, + sizeof(audioPacket), + 0, + broadcastInfo->ai_addr, + broadcastInfo->ai_addrlen + ) == -1) + std::cerr << "Could not send audio packet : " << strerror(errno) << std::endl; + else + std::cout << "[Server] sent : " << done << " bytes" << std::endl; // wait 10ms to simulate lag // TODO(Faraphel): should be extended to simulate live music streaming @@ -101,7 +108,7 @@ void Server::loop() { } // free the server address - freeaddrinfo(clientInfo); + freeaddrinfo(broadcastInfo); } diff --git a/source/Server.hpp b/source/Server.hpp index 4b0d869..df1e28e 100644 --- a/source/Server.hpp +++ b/source/Server.hpp @@ -15,7 +15,7 @@ public: private: mpg123_handle* mpgHandle; - long rate; - int channels; - int encoding; + long rate{}; + int channels{}; + int encoding{}; }; diff --git a/source/main.cpp b/source/main.cpp index ad16d77..78c32f4 100644 --- a/source/main.cpp +++ b/source/main.cpp @@ -1,5 +1,3 @@ -#include "main.hpp" - #include #include #include diff --git a/source/main.hpp b/source/main.hpp deleted file mode 100644 index 6f70f09..0000000 --- a/source/main.hpp +++ /dev/null @@ -1 +0,0 @@ -#pragma once diff --git a/source/packets/AudioPacket.hpp b/source/packets/AudioPacket.hpp new file mode 100644 index 0000000..f0ee918 --- /dev/null +++ b/source/packets/AudioPacket.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + + +struct AudioPacket { + // scheduling + std::chrono::time_point timePlay; + // content + std::uint16_t contentSize; + std::array content; +};