implemented schedule and queue system to make the audio play at a specific time

This commit is contained in:
faraphel 2024-10-21 20:56:00 +02:00
parent 6d0d375707
commit ced6dea00e
8 changed files with 121 additions and 66 deletions

View file

@ -12,12 +12,12 @@ pkg_check_modules(PORTAUDIO REQUIRED portaudio-2.0)
add_executable(M2-PT-DRP add_executable(M2-PT-DRP
source/main.cpp
source/Client.cpp source/Client.cpp
source/Client.hpp source/Client.hpp
source/Server.cpp source/Server.cpp
source/Server.hpp source/Server.hpp
source/main.cpp source/packets/AudioPacket.hpp
source/main.hpp
) )
target_include_directories(M2-PT-DRP PRIVATE target_include_directories(M2-PT-DRP PRIVATE
${MPG123_INCLUDE_DIRS} ${MPG123_INCLUDE_DIRS}

View file

@ -3,16 +3,28 @@
#include <cstring> #include <cstring>
#include <cstdint> #include <cstdint>
#include <iostream> #include <iostream>
#include <list>
#include <map>
#include <netdb.h> #include <netdb.h>
#include <queue>
#include <stdexcept> #include <stdexcept>
#include <vector> #include <vector>
#include <sys/socket.h> #include <sys/socket.h>
#include "packets/AudioPacket.hpp"
struct AudioPacketsComparator {
bool operator() (const AudioPacket &a, const AudioPacket &b) const {
return a.timePlay > b.timePlay;
}
};
Client::Client(const int channels, const double rate) { Client::Client(const int channels, const double rate) {
this->channels = channels; this->channels = channels;
// TODO(Faraphel): make the sampleFormat and the framesPerBuffer arguments. // TODO(Faraphel): make the sampleFormat an argument.
// open a PortAudio stream // open a PortAudio stream
if (Pa_OpenDefaultStream( if (Pa_OpenDefaultStream(
&this->stream, &this->stream,
@ -20,7 +32,7 @@ Client::Client(const int channels, const double rate) {
channels, channels,
paInt16, paInt16,
rate, rate,
512, paFramesPerBufferUnspecified,
nullptr, nullptr,
nullptr nullptr
) != paNoError) ) != paNoError)
@ -34,7 +46,7 @@ Client::~Client() {
} }
void Client::loop() { void Client::loop() const {
int error; int error;
// start the PortAudio stream // start the PortAudio stream
@ -59,7 +71,7 @@ void Client::loop() {
// TODO(Faraphel): port as argument // TODO(Faraphel): port as argument
addrinfo *serverInfo; addrinfo *serverInfo;
if((error = getaddrinfo( if((error = getaddrinfo(
nullptr, // any source nullptr, // hostname
"5650", // our port "5650", // our port
&serverHints, &serverHints,
&serverInfo &serverInfo
@ -79,27 +91,53 @@ void Client::loop() {
sockaddr_storage serverAddress {}; sockaddr_storage serverAddress {};
socklen_t serverAddressLength; socklen_t serverAddressLength;
std::vector<std::uint8_t> buffer(4096);
AudioPacket audioPacketReceived;
std::priority_queue<AudioPacket, std::vector<AudioPacket>, AudioPacketsComparator> audioQueue;
// receive new audio data // receive new audio data
while (ssize_t size = recvfrom( while (true) {
const ssize_t size = recvfrom(
clientSocket, clientSocket,
buffer.data(), &audioPacketReceived,
buffer.size(), sizeof(audioPacketReceived),
0, 0,
reinterpret_cast<sockaddr *>(&serverAddress), reinterpret_cast<sockaddr *>(&serverAddress),
&serverAddressLength &serverAddressLength
)) { );
if (size == -1) {
std::cout << "Could not receive from the socket : " << gai_strerror(errno) << std::endl;
continue;
}
std::cout << "[Client] received: " << size << " bytes" << std::endl; std::cout << "[Client] received: " << size << " bytes" << std::endl;
audioQueue.push(audioPacketReceived);
// TODO(Faraphel): the receiver and the player shall be in two different thread !
// the music can't play nicely if not done.
while (!audioQueue.empty()) {
const auto& audioPacket = audioQueue.top();
if (audioPacket.timePlay <= std::chrono::high_resolution_clock::now()) {
std::cout << "playing: " << audioPacket.timePlay << std::endl;
// TODO(Faraphel): check for the error ? // TODO(Faraphel): check for the error ?
// TODO(Faraphel) / 2 => / encoding size
// TODO(Faraphel): the number of frames could be improved
// TODO(Faraphel): more comments
// play the audio buffer
error = Pa_WriteStream( error = Pa_WriteStream(
this->stream, this->stream,
buffer.data(), audioPacket.content.data(),
size / 2 / this->channels audioPacket.contentSize / 2 / this->channels
); );
// play the audio buffer if (error != paNoError)
// TODO(Faraphel): the number of frames could be improved std::cerr << "Could not write to the stream : " << Pa_GetErrorText(error) << std::endl;
audioQueue.pop();
}
else
break;
}
} }
} }

View file

@ -9,7 +9,7 @@ public:
explicit Client(int channels, double rate); explicit Client(int channels, double rate);
~Client(); ~Client();
void loop(); void loop() const;
private: private:
PaStream* stream = nullptr; PaStream* stream = nullptr;

View file

@ -2,6 +2,7 @@
#include <iostream> #include <iostream>
#include <cstdint> #include <cstdint>
#include <cstring>
#include <mpg123.h> #include <mpg123.h>
#include <netdb.h> #include <netdb.h>
#include <stdexcept> #include <stdexcept>
@ -9,6 +10,8 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <vector> #include <vector>
#include "packets/AudioPacket.hpp"
Server::Server() { Server::Server() {
// create a new mpg123 handle // create a new mpg123 handle
@ -38,57 +41,61 @@ Server::~Server() {
mpg123_delete(this->mpgHandle); mpg123_delete(this->mpgHandle);
} }
void Server::loop() { void Server::loop() {
int error; int error;
// create the socket
const int serverSocket = socket(
AF_INET6,
SOCK_DGRAM,
0
);
if (serverSocket < 0)
throw std::runtime_error("Could not create the socket.");
// get the broadcast address // get the broadcast address
addrinfo clientHints = {}; addrinfo broadcastHints {};
clientHints.ai_family = AF_INET6; broadcastHints.ai_family = AF_INET6;
clientHints.ai_socktype = SOCK_DGRAM; broadcastHints.ai_socktype = SOCK_DGRAM;
clientHints.ai_protocol = IPPROTO_UDP; broadcastHints.ai_protocol = IPPROTO_UDP;
// TODO(Faraphel): ip / port as argument // TODO(Faraphel): ip / port as argument
addrinfo *clientInfo; addrinfo *broadcastInfo;
if((error = getaddrinfo( if((error = getaddrinfo(
"localhost", "::1",
// "ff02::1", // IPv6 local broadcast "5650",
"5650", // our port &broadcastHints,
&clientHints, &broadcastInfo
&clientInfo
)) != 0) )) != 0)
throw std::runtime_error("Could not get the address.\n" + std::string(gai_strerror(error))); throw std::runtime_error("Could not get the address.\n" + std::string(gai_strerror(error)));
const int broadcastSocket = socket(
broadcastInfo->ai_family,
broadcastInfo->ai_socktype,
broadcastInfo->ai_protocol
);
if (broadcastSocket == -1)
throw std::runtime_error("Could not create the socket: " + std::string(gai_strerror(errno)));
// read the file // read the file
std::vector<std::uint8_t> buffer(4096); AudioPacket audioPacket;
std::size_t done; std::size_t done;
while (mpg123_read( while (mpg123_read(
this->mpgHandle, this->mpgHandle,
buffer.data(), &audioPacket.content,
buffer.size(), std::size(audioPacket.content),
&done) == MPG123_OK &done
) { ) == MPG123_OK) {
// send the content of the file // set the target time
sendto( audioPacket.timePlay =
serverSocket, std::chrono::high_resolution_clock::now() +
buffer.data(), std::chrono::milliseconds(5000);
buffer.size(),
0,
clientInfo->ai_addr,
clientInfo->ai_addrlen
);
// set the size of the content
audioPacket.contentSize = done;
if (sendto(
broadcastSocket,
&audioPacket,
sizeof(audioPacket),
0,
broadcastInfo->ai_addr,
broadcastInfo->ai_addrlen
) == -1)
std::cerr << "Could not send audio packet : " << strerror(errno) << std::endl;
else
std::cout << "[Server] sent : " << done << " bytes" << std::endl; std::cout << "[Server] sent : " << done << " bytes" << std::endl;
// wait 10ms to simulate lag // wait 10ms to simulate lag
@ -101,7 +108,7 @@ void Server::loop() {
} }
// free the server address // free the server address
freeaddrinfo(clientInfo); freeaddrinfo(broadcastInfo);
} }

View file

@ -15,7 +15,7 @@ public:
private: private:
mpg123_handle* mpgHandle; mpg123_handle* mpgHandle;
long rate; long rate{};
int channels; int channels{};
int encoding; int encoding{};
}; };

View file

@ -1,5 +1,3 @@
#include "main.hpp"
#include <mpg123.h> #include <mpg123.h>
#include <portaudio.h> #include <portaudio.h>
#include <stdexcept> #include <stdexcept>

View file

@ -1 +0,0 @@
#pragma once

View file

@ -0,0 +1,13 @@
#pragma once
#include <chrono>
#include <cstdint>
struct AudioPacket {
// scheduling
std::chrono::time_point<std::chrono::high_resolution_clock> timePlay;
// content
std::uint16_t contentSize;
std::array<std::uint8_t, 65280> content;
};