reorganised the way the program share data (Context, Peer, RemotePeer)

This commit is contained in:
faraphel 2024-11-04 23:40:40 +01:00
parent 23732795d0
commit 218ee71118
25 changed files with 190 additions and 77 deletions

View file

@ -14,7 +14,7 @@ pkg_check_modules(OpenSSL REQUIRED openssl)
add_executable(M2-PT-DRP add_executable(M2-PT-DRP
source/main.cpp source/main.cpp
source/packets/AudioPacket.hpp source/packets/audio/AudioPacketData.hpp
source/utils/audio.cpp source/utils/audio.cpp
source/utils/audio.hpp source/utils/audio.hpp
source/EventManager.cpp source/EventManager.cpp
@ -22,7 +22,7 @@ add_executable(M2-PT-DRP
source/packets/base/GenericPacket.hpp source/packets/base/GenericPacket.hpp
source/events/types.hpp source/events/types.hpp
source/packets/base/GenericPacket.cpp source/packets/base/GenericPacket.cpp
source/Peer.hpp source/RemotePeer.hpp
source/utils/StatList.cpp source/utils/StatList.cpp
source/utils/StatList.hpp source/utils/StatList.hpp
source/events/base/BaseEvent.hpp source/events/base/BaseEvent.hpp
@ -45,6 +45,7 @@ add_executable(M2-PT-DRP
source/tasks/undefined/UndefinedTask.hpp source/tasks/undefined/UndefinedTask.hpp
source/tasks/client/ClientTask.cpp source/tasks/client/ClientTask.cpp
source/tasks/client/ClientTask.hpp source/tasks/client/ClientTask.hpp
source/Context.hpp
) )
target_include_directories(M2-PT-DRP PRIVATE target_include_directories(M2-PT-DRP PRIVATE
${MPG123_INCLUDE_DIRS} ${MPG123_INCLUDE_DIRS}

15
source/Context.hpp Normal file
View file

@ -0,0 +1,15 @@
#pragma once
#include "RemotePeer.hpp"
#include "utils/StatList.hpp"
struct Context {
int socket = -1;
drp::util::StatList<std::shared_ptr<RemotePeer>> remotePeers {};
std::shared_ptr<RemotePeer> server = nullptr;
Peer me;
};

View file

@ -2,7 +2,6 @@
#include <algorithm> #include <algorithm>
#include <stdexcept> #include <stdexcept>
#include <cstdint>
#include <cstring> #include <cstring>
#include <iostream> #include <iostream>
#include <map> #include <map>
@ -11,7 +10,6 @@
#include <thread> #include <thread>
#include <sys/socket.h> #include <sys/socket.h>
#include "Peer.hpp"
#include "events/types.hpp" #include "events/types.hpp"
#include "events/audio/AudioEvent.hpp" #include "events/audio/AudioEvent.hpp"
#include "events/info/InfoEvent.hpp" #include "events/info/InfoEvent.hpp"
@ -24,10 +22,6 @@
EventManager::EventManager() { EventManager::EventManager() {
this->channel = 0;
this->server = nullptr;
this->status = drp::task::TaskType::UNDEFINED;
// register the different events type // register the different events type
this->eventRegistry = { this->eventRegistry = {
{drp::event::EventType::PONG, std::make_shared<drp::event::PongEvent>()}, {drp::event::EventType::PONG, std::make_shared<drp::event::PongEvent>()},
@ -51,13 +45,13 @@ EventManager::EventManager() {
addressHints.ai_protocol = IPPROTO_UDP; addressHints.ai_protocol = IPPROTO_UDP;
// create the client socket // create the client socket
this->eventSocket = socket( this->context.socket = socket(
addressHints.ai_family, addressHints.ai_family,
addressHints.ai_socktype, addressHints.ai_socktype,
addressHints.ai_protocol addressHints.ai_protocol
); );
if (this->eventSocket < 0) if (this->context.socket < 0)
throw std::runtime_error("[Receiver] Could not create the socket: " + std::string(strerror(errno))); throw std::runtime_error("[Receiver] Could not create the socket: " + std::string(strerror(errno)));
} }
@ -95,7 +89,7 @@ void EventManager::loopSender() {
// get the corresponding task class // get the corresponding task class
std::shared_ptr<drp::task::BaseTask> task; std::shared_ptr<drp::task::BaseTask> task;
try { try {
task = this->taskRegistry.at(this->status); task = this->taskRegistry.at(this->context.me.status);
} catch (const std::out_of_range& exception) { } catch (const std::out_of_range& exception) {
std::cerr << "Unsupported status." << std::endl; std::cerr << "Unsupported status." << std::endl;
continue; continue;
@ -116,8 +110,8 @@ void EventManager::loopSender() {
void EventManager::loopReceiver() { void EventManager::loopReceiver() {
// prepare space for the sender address // prepare space for the sender address
sockaddr_storage senderAddress {}; sockaddr_storage fromAddress {};
socklen_t senderAddressLength = sizeof(senderAddress); socklen_t fromAddressLength = sizeof(fromAddress);
drp::packet::GenericPacket packet {}; drp::packet::GenericPacket packet {};
drp::packet::GenericPacketContent packetContent {}; drp::packet::GenericPacketContent packetContent {};
@ -138,7 +132,7 @@ void EventManager::loopReceiver() {
// bind the socket to the address // bind the socket to the address
if (bind( if (bind(
this->eventSocket, this->context.socket,
senderInfo->ai_addr, senderInfo->ai_addr,
senderInfo->ai_addrlen senderInfo->ai_addrlen
) < 0) ) < 0)
@ -151,18 +145,18 @@ void EventManager::loopReceiver() {
while (true) { while (true) {
// receive new data // receive new data
const ssize_t size = recvfrom( const ssize_t size = recvfrom(
this->eventSocket, this->context.socket,
&packet, &packet,
sizeof(packet), sizeof(packet),
0, 0,
reinterpret_cast<sockaddr*>(&senderAddress), reinterpret_cast<sockaddr*>(&fromAddress),
&senderAddressLength &fromAddressLength
); );
if (size == -1) if (size == -1)
throw std::runtime_error("[Receiver] Could not receive the packet: " + std::string(strerror(errno))); throw std::runtime_error("[Receiver] Could not receive the packet: " + std::string(strerror(errno)));
// if the packet channel is neither 0 (all) nor the current one, ignore it // if the packet channel is neither 0 (all) nor the current one, ignore it
if (packet.channel != 0 && packet.channel != this->channel) if (packet.channel != 0 && packet.channel != this->context.me.channel)
continue; continue;
// decrypt the packet // decrypt the packet
@ -178,6 +172,11 @@ void EventManager::loopReceiver() {
} }
// ask the event class to handle the event // ask the event class to handle the event
event->handle(packetContent); event->handle(
this->context,
packetContent,
reinterpret_cast<sockaddr*>(&fromAddress),
fromAddressLength
);
} }
} }

View file

@ -5,7 +5,8 @@
#include <netdb.h> #include <netdb.h>
#include <thread> #include <thread>
#include "Peer.hpp" #include "Context.hpp"
#include "RemotePeer.hpp"
#include "events/types.hpp" #include "events/types.hpp"
#include "events/base/BaseEvent.hpp" #include "events/base/BaseEvent.hpp"
#include "tasks/types.hpp" #include "tasks/types.hpp"
@ -28,11 +29,5 @@ private:
std::map<drp::event::EventType, std::shared_ptr<drp::event::BaseEvent>> eventRegistry; /// hold the event to call depending on the event type std::map<drp::event::EventType, std::shared_ptr<drp::event::BaseEvent>> eventRegistry; /// hold the event to call depending on the event type
std::map<drp::task::TaskType, std::shared_ptr<drp::task::BaseTask>> taskRegistry; /// hold the task to call depending on the server status std::map<drp::task::TaskType, std::shared_ptr<drp::task::BaseTask>> taskRegistry; /// hold the task to call depending on the server status
StatList<std::shared_ptr<Peer>> peers; /// list of peers found Context context; /// context used between the events types
std::shared_ptr<Peer> server; /// the peer used as a server
int eventSocket; /// the socket used to communicate
drp::task::TaskType status; /// our current status
std::uint8_t channel; /// the packet channel currently used
}; };

View file

@ -1,15 +0,0 @@
#pragma once
#include <sys/socket.h>
#include "tasks/types.hpp"
struct Peer {
// communication
sockaddr address;
socklen_t addressLength;
// information
drp::task::TaskType status;
};

28
source/RemotePeer.hpp Normal file
View file

@ -0,0 +1,28 @@
#pragma once
#include <sys/socket.h>
#include "tasks/types.hpp"
/**
* Contains common information about a certain peer.
*/
struct Peer {
bool serverEnabled = false;
drp::task::TaskType status = drp::task::TaskType::UNDEFINED;
uint8_t channel = 0;
};
/**
* Contains information about a distant peer.
*/
struct RemotePeer {
// communication
sockaddr address {};
socklen_t addressLength = 0;
// information
Peer information;
};

View file

@ -1,5 +1,6 @@
#include "AudioEvent.hpp" #include "AudioEvent.hpp"
#include <cstring>
#include <iostream> #include <iostream>
#include <bits/unique_lock.h> #include <bits/unique_lock.h>
@ -28,8 +29,17 @@ AudioEvent::~AudioEvent() {
} }
void AudioEvent::handle(const packet::GenericPacketContent& content) { void AudioEvent::handle(
this->audioQueue.push(static_cast<packet::AudioPacket>(content)); Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
) {
// get the audio data in the content
packet::AudioPacketData audioData;
std::memcpy(&audioData, content.data.data(), content.data.size());
// save it in the audio queue
this->audioQueue.push(audioData);
// notify that a new audio chunk is available // notify that a new audio chunk is available
this->audioCondition.notify_one(); this->audioCondition.notify_one();
} }

View file

@ -20,7 +20,12 @@ public:
void updateAudioStream(int channels, std::uint32_t sampleFormat, double sampleRate); void updateAudioStream(int channels, std::uint32_t sampleFormat, double sampleRate);
void loopPlay(); void loopPlay();
void handle(const packet::GenericPacketContent& content) override; void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
) override;
private: private:
std::thread playerThread; std::thread playerThread;
@ -29,7 +34,7 @@ private:
int streamChannels; int streamChannels;
std::uint32_t streamSampleFormat; std::uint32_t streamSampleFormat;
double streamRate; double streamRate;
std::priority_queue<packet::AudioPacket, std::vector<packet::AudioPacket>, AudioPacketsComparator> audioQueue; std::priority_queue<packet::AudioPacketData, std::vector<packet::AudioPacketData>, AudioPacketsComparator> audioQueue;
std::mutex audioMutex; std::mutex audioMutex;
std::unique_lock<std::mutex> audioLock; std::unique_lock<std::mutex> audioLock;

View file

@ -4,7 +4,7 @@
namespace drp::event { namespace drp::event {
bool AudioPacketsComparator::operator()(const packet::AudioPacket& a, const packet::AudioPacket& b) const { bool AudioPacketsComparator::operator()(const packet::AudioPacketData& a, const packet::AudioPacketData& b) const {
return a.timePlay > b.timePlay; return a.timePlay > b.timePlay;
} }

View file

@ -1,13 +1,13 @@
#pragma once #pragma once
#include "../../packets/audio/AudioPacket.hpp" #include "../../packets/audio/AudioPacketData.hpp"
namespace drp::event { namespace drp::event {
struct AudioPacketsComparator { struct AudioPacketsComparator {
bool operator() (const packet::AudioPacket& a, const packet::AudioPacket& b) const; bool operator() (const packet::AudioPacketData& a, const packet::AudioPacketData& b) const;
}; };

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "../../packets/base/GenericPacket.hpp" #include "../../packets/base/GenericPacket.hpp"
#include "../../Context.hpp"
namespace drp::event { namespace drp::event {
@ -9,7 +10,12 @@ namespace drp::event {
class BaseEvent { class BaseEvent {
public: public:
virtual ~BaseEvent() = default; virtual ~BaseEvent() = default;
virtual void handle(const packet::GenericPacketContent& content) = 0; virtual void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
) = 0;
}; };

View file

@ -4,17 +4,26 @@
#include "../../tasks/types.hpp" #include "../../tasks/types.hpp"
namespace drp::event { namespace drp::event {
void InfoEvent::handle(const packet::GenericPacketContent& content) { void InfoEvent::handle(
Peer peer {}; Context& context,
peer.address = *reinterpret_cast<sockaddr*>(&senderAddress); const packet::GenericPacketContent& content,
peer.addressLength = senderAddressLength; sockaddr* fromAddress,
peer.status = task::TaskType::UNDEFINED; const socklen_t fromAddressLength
) {
const auto remotePeer = std::make_shared<RemotePeer>();
remotePeer->address = *reinterpret_cast<sockaddr*>(&fromAddress);
remotePeer->addressLength = fromAddressLength;
// TODO(Faraphel): convert the memory from content to Peer.
remotePeer->information = content;
// save it in the peers list // save it in the peers list
this->peers.push_back(peer); context.remotePeers.push_back(remotePeer);
} }

View file

@ -8,7 +8,12 @@ namespace drp::event {
class InfoEvent : public BaseEvent { class InfoEvent : public BaseEvent {
public: public:
void handle(const packet::GenericPacketContent& content) override; void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
) override;
}; };

View file

@ -6,7 +6,12 @@
namespace drp::event { namespace drp::event {
void PongEvent::handle(const packet::GenericPacketContent& content) { void PongEvent::handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
) {
std::cout << "[Receiver] Pong." << std::endl; std::cout << "[Receiver] Pong." << std::endl;
} }

View file

@ -8,7 +8,12 @@ namespace drp::event {
class PongEvent : public BaseEvent { class PongEvent : public BaseEvent {
public: public:
void handle(const packet::GenericPacketContent& content) override; void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
) override;
}; };

View file

@ -13,27 +13,32 @@
namespace drp { namespace drp {
void event::SearchEvent::handle(const packet::GenericPacketContent& content) { void event::SearchEvent::handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const socklen_t fromAddressLength
) {
packet::GenericPacket packet {}; packet::GenericPacket packet {};
packet::GenericPacketContent packetContent {}; packet::GenericPacketContent packetContent {};
Information information {}; // create the packet header (available to read for everyone)
information.status = task::TaskType::UNDEFINED;
packet.channel = 0; packet.channel = 0;
packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN); packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN);
// insert our information into the packet
packetContent.eventType = static_cast<std::uint8_t>(EventType::INFO); packetContent.eventType = static_cast<std::uint8_t>(EventType::INFO);
std::memcpy(&packetContent.data, &information, sizeof(Information)); std::memcpy(&packetContent.data, &context.me, sizeof(context.me));
packet.setContent(packetContent); packet.setContent(packetContent);
// broadcast our information
if (sendto( if (sendto(
clientSocket, context.socket,
&packet, &packet,
sizeof(packet), sizeof(packet),
0, 0,
serverAddress, fromAddress,
serverAddressLength fromAddressLength
) == -1) ) == -1)
std::cerr << "[Receiver] Could not send information: " << strerror(errno) << std::endl; std::cerr << "[Receiver] Could not send information: " << strerror(errno) << std::endl;

View file

@ -7,7 +7,12 @@ namespace drp::event {
class SearchEvent : public BaseEvent { class SearchEvent : public BaseEvent {
public: public:
void handle(const packet::GenericPacketContent& content) override; void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
) override;
}; };

View file

@ -7,7 +7,7 @@
namespace drp::packet { namespace drp::packet {
struct AudioPacket { struct AudioPacketData {
// scheduling // scheduling
// TODO(Faraphel): use a more "fixed" size format ? // TODO(Faraphel): use a more "fixed" size format ?
std::chrono::time_point<std::chrono::high_resolution_clock> timePlay; std::chrono::time_point<std::chrono::high_resolution_clock> timePlay;

View file

@ -10,6 +10,9 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <vector> #include <vector>
#include "../../packets/audio/AudioPacketData.hpp"
#include "../../utils/audio.hpp"
namespace drp::task { namespace drp::task {
@ -78,7 +81,7 @@ void ServerTask::loop() const {
throw std::runtime_error("[Server] Could not create the socket: " + std::string(gai_strerror(errno))); throw std::runtime_error("[Server] Could not create the socket: " + std::string(gai_strerror(errno)));
// read the file // read the file
AudioPacket audioPacket; packet::AudioPacketData audioPacket;
std::size_t done; std::size_t done;
while (mpg123_read( while (mpg123_read(
@ -95,7 +98,7 @@ void ServerTask::loop() const {
// set the audio settings // set the audio settings
audioPacket.channels = this->channels; audioPacket.channels = this->channels;
audioPacket.sampleFormat = encoding_mpg123_to_PulseAudio(this->encoding); audioPacket.sampleFormat = util::encoding_mpg123_to_PulseAudio(this->encoding);
audioPacket.sampleRate = this->sampleRate; audioPacket.sampleRate = this->sampleRate;
// set the size of the content // set the size of the content
@ -129,4 +132,4 @@ void ServerTask::loop() const {
} }
} }

View file

@ -1,6 +1,8 @@
#pragma once #pragma once
#include <mpg123.h> #include <mpg123.h>
#include "../base/BaseTask.hpp"
namespace drp::task { namespace drp::task {
@ -25,4 +27,4 @@ private:
}; };
} }

View file

@ -26,13 +26,18 @@ void UndefinedTask::handle() {
) { ) {
// verify if there are peers // verify if there are peers
if (this->peers.empty()) { if (this->peers.empty()) {
// if we are alone, become the server // if we are alone in the network
// TODO(Faraphel): only apply if we are capable of emitting !
// check if we are capable of being a server
if (!canEmit())
return;
// set ourselves as the server
this->status = TaskType::SERVER; this->status = TaskType::SERVER;
return; return;
} }
// search for a server in the peers // search for a server among the peers
const auto& server = std::find_if( const auto& server = std::find_if(
this->peers.begin(), this->peers.begin(),
this->peers.end(), this->peers.end(),
@ -51,6 +56,7 @@ void UndefinedTask::handle() {
} }
// prepare a search message // prepare a search message
packet::GenericPacket packet {};
packet.channel = 0; packet.channel = 0;
packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN); packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN);
packet._content.eventType = static_cast<std::uint8_t>(event::EventType::SEARCH); packet._content.eventType = static_cast<std::uint8_t>(event::EventType::SEARCH);

View file

@ -1,6 +1,9 @@
#include "StatList.hpp" #include "StatList.hpp"
namespace drp::util {
template<class T> template<class T>
StatList<T>::StatList() { StatList<T>::StatList() {
this->updateModificationTime(); this->updateModificationTime();
@ -28,3 +31,6 @@ template<class T>
void StatList<T>::updateModificationTime() { void StatList<T>::updateModificationTime() {
this->modificationTime = std::chrono::high_resolution_clock::now(); this->modificationTime = std::chrono::high_resolution_clock::now();
} }
}

View file

@ -4,6 +4,9 @@
#include <list> #include <list>
namespace drp::util {
/** /**
* A simple list with additional stat data, such as the date of the latest modification * A simple list with additional stat data, such as the date of the latest modification
* @tparam T the type of elements in the list * @tparam T the type of elements in the list
@ -22,3 +25,6 @@ private:
void updateModificationTime(); void updateModificationTime();
std::chrono::time_point<std::chrono::high_resolution_clock> modificationTime; std::chrono::time_point<std::chrono::high_resolution_clock> modificationTime;
}; };
}

View file

@ -6,6 +6,9 @@
#include <portaudio.h> #include <portaudio.h>
namespace drp::util {
std::uint32_t encoding_mpg123_to_PulseAudio(const int encoding_mpg123) { std::uint32_t encoding_mpg123_to_PulseAudio(const int encoding_mpg123) {
switch (encoding_mpg123) { switch (encoding_mpg123) {
case MPG123_ENC_UNSIGNED_8: case MPG123_ENC_UNSIGNED_8:
@ -25,3 +28,6 @@ std::uint32_t encoding_mpg123_to_PulseAudio(const int encoding_mpg123) {
throw std::runtime_error("Invalid encoding value."); throw std::runtime_error("Invalid encoding value.");
} }
} }
}

View file

@ -3,4 +3,10 @@
#include <cstdint> #include <cstdint>
std::uint32_t encoding_mpg123_to_PulseAudio(int encoding_mpg123); namespace drp::util {
std::uint32_t encoding_mpg123_to_PulseAudio(int encoding_mpg123);
}