restructured the program

This commit is contained in:
faraphel 2024-11-04 00:01:48 +01:00
parent 824a48f8d2
commit 23732795d0
33 changed files with 901 additions and 200 deletions

View file

@ -9,23 +9,50 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(PkgConfig REQUIRED)
pkg_check_modules(MPG123 REQUIRED libmpg123)
pkg_check_modules(PORTAUDIO REQUIRED portaudio-2.0)
pkg_check_modules(OpenSSL REQUIRED openssl)
add_executable(M2-PT-DRP
source/main.cpp
source/Client.cpp
source/Client.hpp
source/Server.cpp
source/Server.hpp
source/packets/AudioPacket.hpp
source/utils/audio.cpp
source/utils/audio.hpp
source/EventManager.cpp
source/EventManager.hpp
source/packets/base/GenericPacket.hpp
source/events/types.hpp
source/packets/base/GenericPacket.cpp
source/Peer.hpp
source/utils/StatList.cpp
source/utils/StatList.hpp
source/events/base/BaseEvent.hpp
source/events/base/BaseEvent.hpp
source/events/audio/AudioPacketsComparator.cpp
source/events/audio/AudioPacketsComparator.hpp
source/events/audio/AudioEvent.hpp
source/events/audio/AudioEvent.cpp
source/events/pong/PongEvent.cpp
source/events/pong/PongEvent.hpp
source/events/search/SearchEvent.cpp
source/events/search/SearchEvent.hpp
source/events/info/InfoEvent.cpp
source/events/info/InfoEvent.hpp
source/tasks/types.hpp
source/tasks/base/BaseTask.hpp
source/tasks/server/ServerTask.cpp
source/tasks/server/ServerTask.hpp
source/tasks/undefined/UndefinedTask.cpp
source/tasks/undefined/UndefinedTask.hpp
source/tasks/client/ClientTask.cpp
source/tasks/client/ClientTask.hpp
)
target_include_directories(M2-PT-DRP PRIVATE
${MPG123_INCLUDE_DIRS}
${PORTAUDIO_INCLUDE_DIRS}
${OPENSSL_INCLUDE_DIRS}
)
target_link_libraries(M2-PT-DRP PRIVATE
${MPG123_LIBRARIES}
${PORTAUDIO_LIBRARIES}
${OPENSSL_LIBRARIES}
)

View file

@ -1,65 +0,0 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <portaudio.h>
#include <queue>
#include <thread>
#include "packets/AudioPacket.hpp"
// TODO(Faraphel): should be moved somewhere else
struct AudioPacketsComparator {
bool operator() (const AudioPacket &a, const AudioPacket &b) const {
return a.timePlay > b.timePlay;
}
};
/**
* the audio Client.
* Receive audio packets and play them at a specific time.
*/
class Client {
public:
explicit Client();
~Client();
/**
* Update the current audio stream
* @param channels the number of channels
* @param sampleFormat the sample format type
* @param sampleRate the audio rate
*/
void updateStream(int channels, std::uint32_t sampleFormat, double sampleRate);
/**
* Indefinitely receive and play audio data.
*/
void loop();
private:
/**
* Indefinitely receive audio data.
*/
void loopReceiver();
/**
* Indefinitely play audio data.
*/
void loopPlayer();
PaStream* stream;
int streamChannels;
std::uint32_t streamSampleFormat;
double streamRate;
std::priority_queue<AudioPacket, std::vector<AudioPacket>, AudioPacketsComparator> audioQueue;
std::mutex audioMutex;
std::unique_lock<std::mutex> audioLock;
std::condition_variable audioCondition;
std::thread receiverThread;
std::thread playerThread;
};

183
source/EventManager.cpp Normal file
View file

@ -0,0 +1,183 @@
#include "EventManager.hpp"
#include <algorithm>
#include <stdexcept>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <map>
#include <netdb.h>
#include <ostream>
#include <thread>
#include <sys/socket.h>
#include "Peer.hpp"
#include "events/types.hpp"
#include "events/audio/AudioEvent.hpp"
#include "events/info/InfoEvent.hpp"
#include "events/pong/PongEvent.hpp"
#include "events/search/SearchEvent.hpp"
#include "packets/base/GenericPacket.hpp"
#include "tasks/client/ClientTask.hpp"
#include "tasks/server/ServerTask.hpp"
#include "tasks/undefined/UndefinedTask.hpp"
EventManager::EventManager() {
this->channel = 0;
this->server = nullptr;
this->status = drp::task::TaskType::UNDEFINED;
// register the different events type
this->eventRegistry = {
{drp::event::EventType::PONG, std::make_shared<drp::event::PongEvent>()},
{drp::event::EventType::SEARCH, std::make_shared<drp::event::SearchEvent>()},
{drp::event::EventType::INFO, std::make_shared<drp::event::InfoEvent>()},
{drp::event::EventType::AUDIO, std::make_shared<drp::event::AudioEvent>()},
};
// register the different tasks type
this->taskRegistry = {
{drp::task::TaskType::UNDEFINED, std::make_shared<drp::task::UndefinedTask>()},
{drp::task::TaskType::CLIENT, std::make_shared<drp::task::ClientTask>()},
{drp::task::TaskType::SERVER, std::make_shared<drp::task::ServerTask>()},
};
// hints for the communication
// TODO: should be merged with the two others addressHints
addrinfo addressHints {};
addressHints.ai_family = AF_INET; // TODO: AF_INET6
addressHints.ai_socktype = SOCK_DGRAM;
addressHints.ai_protocol = IPPROTO_UDP;
// create the client socket
this->eventSocket = socket(
addressHints.ai_family,
addressHints.ai_socktype,
addressHints.ai_protocol
);
if (this->eventSocket < 0)
throw std::runtime_error("[Receiver] Could not create the socket: " + std::string(strerror(errno)));
}
void EventManager::loop() {
// run an event receiver and sender
this->senderThread = std::thread(&EventManager::loopSender, this);
this->receiverThread = std::thread(&EventManager::loopReceiver, this);
this->senderThread.join();
this->receiverThread.join();
}
void EventManager::loopSender() {
addrinfo* destinationInfo;
addrinfo addressHints {};
addressHints.ai_family = AF_INET; // TODO: AF_INET6
addressHints.ai_socktype = SOCK_DGRAM;
addressHints.ai_protocol = IPPROTO_UDP;
// get the information for the broadcast local-link address
// TODO(Faraphel): ip / port as argument ?
if(const int error = getaddrinfo(
"localhost", // TODO: ff02::1
"5650",
&addressHints,
&destinationInfo
) != 0)
throw std::runtime_error("[Sender] Could not get the address: " + std::string(gai_strerror(error)));
while (true) {
// get the corresponding task class
std::shared_ptr<drp::task::BaseTask> task;
try {
task = this->taskRegistry.at(this->status);
} catch (const std::out_of_range& exception) {
std::cerr << "Unsupported status." << std::endl;
continue;
}
// ask the task class to handle the task
task->handle();
// wait a second
// TODO(Faraphel): might be moved to the tasks directly ? what if they want a lower cooldown ?
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// free the address
freeaddrinfo(destinationInfo);
}
void EventManager::loopReceiver() {
// prepare space for the sender address
sockaddr_storage senderAddress {};
socklen_t senderAddressLength = sizeof(senderAddress);
drp::packet::GenericPacket packet {};
drp::packet::GenericPacketContent packetContent {};
addrinfo addressHints {};
addressHints.ai_family = AF_INET; // TODO: AF_INET6
addressHints.ai_socktype = SOCK_DGRAM;
addressHints.ai_protocol = IPPROTO_UDP;
// TODO(Faraphel): port as argument
addrinfo* senderInfo;
if(getaddrinfo(
nullptr, // hostname
"5650", // port
&addressHints,
&senderInfo
) != 0)
throw std::runtime_error("[Receiver] Could not get the address: " + std::string(gai_strerror(errno)));
// bind the socket to the address
if (bind(
this->eventSocket,
senderInfo->ai_addr,
senderInfo->ai_addrlen
) < 0)
throw std::runtime_error("[Receiver] Could not bind to the address: " + std::string(strerror(errno)));
// free the sender address
freeaddrinfo(senderInfo);
// client loop
while (true) {
// receive new data
const ssize_t size = recvfrom(
this->eventSocket,
&packet,
sizeof(packet),
0,
reinterpret_cast<sockaddr*>(&senderAddress),
&senderAddressLength
);
if (size == -1)
throw std::runtime_error("[Receiver] Could not receive the packet: " + std::string(strerror(errno)));
// if the packet channel is neither 0 (all) nor the current one, ignore it
if (packet.channel != 0 && packet.channel != this->channel)
continue;
// decrypt the packet
packetContent = packet.getContent();
// get the corresponding event class
std::shared_ptr<drp::event::BaseEvent> event;
try {
event = this->eventRegistry.at(static_cast<drp::event::EventType>(packetContent.eventType));
} catch (const std::out_of_range& exception) {
std::cerr << "Unsupported event type." << std::endl;
continue;
}
// ask the event class to handle the event
event->handle(packetContent);
}
}

38
source/EventManager.hpp Normal file
View file

@ -0,0 +1,38 @@
#pragma once
#include <cstdint>
#include <map>
#include <netdb.h>
#include <thread>
#include "Peer.hpp"
#include "events/types.hpp"
#include "events/base/BaseEvent.hpp"
#include "tasks/types.hpp"
#include "tasks/base/BaseTask.hpp"
#include "utils/StatList.hpp"
class EventManager {
public:
EventManager();
void loop();
void loopSender();
void loopReceiver();
private:
std::thread senderThread; /// the thread sending communication
std::thread receiverThread; /// the thread receiving communication
std::map<drp::event::EventType, std::shared_ptr<drp::event::BaseEvent>> eventRegistry; /// hold the event to call depending on the event type
std::map<drp::task::TaskType, std::shared_ptr<drp::task::BaseTask>> taskRegistry; /// hold the task to call depending on the server status
StatList<std::shared_ptr<Peer>> peers; /// list of peers found
std::shared_ptr<Peer> server; /// the peer used as a server
int eventSocket; /// the socket used to communicate
drp::task::TaskType status; /// our current status
std::uint8_t channel; /// the packet channel currently used
};

15
source/Peer.hpp Normal file
View file

@ -0,0 +1,15 @@
#pragma once
#include <sys/socket.h>
#include "tasks/types.hpp"
struct Peer {
// communication
sockaddr address;
socklen_t addressLength;
// information
drp::task::TaskType status;
};

1
source/events/README.md Normal file
View file

@ -0,0 +1 @@
This directory contains the code describing how to react to a specific event.

View file

@ -1,28 +1,41 @@
#include "Client.hpp"
#include "AudioEvent.hpp"
#include <cstring>
#include <iostream>
#include <list>
#include <map>
#include <netdb.h>
#include <queue>
#include <stdexcept>
#include <thread>
#include <sys/socket.h>
#include "packets/AudioPacket.hpp"
#include <bits/unique_lock.h>
Client::Client() {
namespace drp::event {
AudioEvent::AudioEvent() {
this->stream = nullptr;
this->audioLock = std::unique_lock(this->audioMutex);
this->streamChannels = 0;
this->streamSampleFormat = 0;
this->streamRate = 0;
// start a thread for the player
this->playerThread = std::thread(&AudioEvent::loopPlay, this);
}
void Client::updateStream(const int channels, const std::uint32_t sampleFormat, const double sampleRate) {
AudioEvent::~AudioEvent() {
// stop any currently playing audio
Pa_StopStream(this->stream);
// close the audio stream
if (const PaError error = Pa_CloseStream(this->stream))
std::cerr << "[Client] Could not close the stream: " << std::string(Pa_GetErrorText(error)) << std::endl;
}
void AudioEvent::handle(const packet::GenericPacketContent& content) {
this->audioQueue.push(static_cast<packet::AudioPacket>(content));
// notify that a new audio chunk is available
this->audioCondition.notify_one();
}
void AudioEvent::updateAudioStream(const int channels, const std::uint32_t sampleFormat, const double sampleRate) {
// check if any information changed. If no, ignore this
if (
this->streamChannels == channels &&
@ -54,92 +67,8 @@ void Client::updateStream(const int channels, const std::uint32_t sampleFormat,
this->streamRate = sampleRate;
}
Client::~Client() {
// stop any currently playing audio
Pa_StopStream(this->stream);
// close the audio stream
if (const PaError error = Pa_CloseStream(this->stream))
std::cerr << "[Client] Could not close the stream: " << std::string(Pa_GetErrorText(error)) << std::endl;
}
void Client::loop() {
// run an audio receiver alongside an audio player
this->receiverThread = std::thread(&Client::loopReceiver, this);
this->playerThread = std::thread(&Client::loopPlayer, this);
this->receiverThread.join();
this->playerThread.join();
}
void Client::loopReceiver() {
// create the socket
const int clientSocket = socket(
AF_INET6,
SOCK_DGRAM,
0
);
if (clientSocket < 0)
throw std::runtime_error("[Client] Could not create the socket: " + std::string(gai_strerror(errno)));
// get the broadcast address
addrinfo serverHints = {};
serverHints.ai_family = AF_INET6;
serverHints.ai_socktype = SOCK_DGRAM;
serverHints.ai_protocol = IPPROTO_UDP;
// TODO(Faraphel): port as argument
addrinfo *serverInfo;
if(getaddrinfo(
nullptr, // hostname
"5650", // our port
&serverHints,
&serverInfo
) != 0)
throw std::runtime_error("[Client] Could not get the address: " + std::string(gai_strerror(errno)));
// bind the socket to the address
if (bind(
clientSocket,
serverInfo->ai_addr,
serverInfo->ai_addrlen
) < 0)
throw std::runtime_error("[Client] Could not bind to the address: " + std::string(gai_strerror(errno)));
// free the server address
freeaddrinfo(serverInfo);
// prepare space for the server address
sockaddr_storage serverAddress {};
socklen_t serverAddressLength;
// prepare space for the received audio
AudioPacket audioPacket;
// receive new audio data
while (true) {
// receive new audio data
const ssize_t size = recvfrom(
clientSocket,
&audioPacket,
sizeof(audioPacket),
0,
reinterpret_cast<sockaddr *>(&serverAddress),
&serverAddressLength
);
if (size == -1) {
std::cerr << "[Client] Could not receive from the socket: " << gai_strerror(errno) << std::endl;
continue;
}
// save the audio data into the queue for the player
std::cout << "[Client] Received: " << size << " bytes" << std::endl;
this->audioQueue.push(audioPacket);
// notify that a new audio chunk is available
this->audioCondition.notify_one();
}
}
void Client::loopPlayer() {
void AudioEvent::loopPlay() {
while (true) {
// wait for a new element in the audio queue
this->audioCondition.wait(
@ -150,7 +79,7 @@ void Client::loopPlayer() {
const auto audioPacket = this->audioQueue.top();
// update the stream with the new audio settings
this->updateStream(
this->updateAudioStream(
audioPacket.channels,
audioPacket.sampleFormat,
audioPacket.sampleRate
@ -179,7 +108,7 @@ void Client::loopPlayer() {
switch (error) {
// success
case paNoError:
// the output might be very slightly underflowed,
// the output might be very slightly underflown,
// causing a very small period where no noise will be played.
case paOutputUnderflowed:
break;
@ -192,3 +121,6 @@ void Client::loopPlayer() {
this->audioQueue.pop();
}
}
}

View file

@ -0,0 +1,40 @@
#pragma once
#include <condition_variable>
#include <portaudio.h>
#include <queue>
#include <bits/std_mutex.h>
#include <bits/unique_lock.h>
#include "AudioPacketsComparator.hpp"
#include "../base/BaseEvent.hpp"
namespace drp::event {
class AudioEvent : public BaseEvent {
public:
AudioEvent();
~AudioEvent() override;
void updateAudioStream(int channels, std::uint32_t sampleFormat, double sampleRate);
void loopPlay();
void handle(const packet::GenericPacketContent& content) override;
private:
std::thread playerThread;
PaStream* stream;
int streamChannels;
std::uint32_t streamSampleFormat;
double streamRate;
std::priority_queue<packet::AudioPacket, std::vector<packet::AudioPacket>, AudioPacketsComparator> audioQueue;
std::mutex audioMutex;
std::unique_lock<std::mutex> audioLock;
std::condition_variable audioCondition;
};
}

View file

@ -0,0 +1,12 @@
#include "AudioPacketsComparator.hpp"
namespace drp::event {
bool AudioPacketsComparator::operator()(const packet::AudioPacket& a, const packet::AudioPacket& b) const {
return a.timePlay > b.timePlay;
}
}

View file

@ -0,0 +1,14 @@
#pragma once
#include "../../packets/audio/AudioPacket.hpp"
namespace drp::event {
struct AudioPacketsComparator {
bool operator() (const packet::AudioPacket& a, const packet::AudioPacket& b) const;
};
}

View file

@ -0,0 +1,16 @@
#pragma once
#include "../../packets/base/GenericPacket.hpp"
namespace drp::event {
class BaseEvent {
public:
virtual ~BaseEvent() = default;
virtual void handle(const packet::GenericPacketContent& content) = 0;
};
}

View file

@ -0,0 +1,21 @@
#include "InfoEvent.hpp"
#include <sys/socket.h>
#include "../../tasks/types.hpp"
namespace drp::event {
void InfoEvent::handle(const packet::GenericPacketContent& content) {
Peer peer {};
peer.address = *reinterpret_cast<sockaddr*>(&senderAddress);
peer.addressLength = senderAddressLength;
peer.status = task::TaskType::UNDEFINED;
// save it in the peers list
this->peers.push_back(peer);
}
}

View file

@ -0,0 +1,15 @@
#pragma once
#include "../base/BaseEvent.hpp"
namespace drp::event {
class InfoEvent : public BaseEvent {
public:
void handle(const packet::GenericPacketContent& content) override;
};
}

View file

@ -0,0 +1,14 @@
#include "PongEvent.hpp"
#include <iostream>
namespace drp::event {
void PongEvent::handle(const packet::GenericPacketContent& content) {
std::cout << "[Receiver] Pong." << std::endl;
}
}

View file

@ -0,0 +1,15 @@
#pragma once
#include "../base/BaseEvent.hpp"
namespace drp::event {
class PongEvent : public BaseEvent {
public:
void handle(const packet::GenericPacketContent& content) override;
};
}

View file

@ -0,0 +1,45 @@
#include "SearchEvent.hpp"
#include <cerrno>
#include <sys/socket.h>
#include <cstring>
#include <iostream>
#include <ostream>
#include "../../tasks/types.hpp"
#include "../../events/types.hpp"
namespace drp {
void event::SearchEvent::handle(const packet::GenericPacketContent& content) {
packet::GenericPacket packet {};
packet::GenericPacketContent packetContent {};
Information information {};
information.status = task::TaskType::UNDEFINED;
packet.channel = 0;
packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN);
packetContent.eventType = static_cast<std::uint8_t>(EventType::INFO);
std::memcpy(&packetContent.data, &information, sizeof(Information));
packet.setContent(packetContent);
if (sendto(
clientSocket,
&packet,
sizeof(packet),
0,
serverAddress,
serverAddressLength
) == -1)
std::cerr << "[Receiver] Could not send information: " << strerror(errno) << std::endl;
std::cout << "[Receiver] Sent information." << std::endl;
}
}

View file

@ -0,0 +1,14 @@
#pragma once
#include "../base/BaseEvent.hpp"
namespace drp::event {
class SearchEvent : public BaseEvent {
public:
void handle(const packet::GenericPacketContent& content) override;
};
}

25
source/events/types.hpp Normal file
View file

@ -0,0 +1,25 @@
#pragma once
namespace drp::event {
enum class EventType {
// debug
PING = 0x00, // simple ping
PONG = 0x01, // ping response
// protocol
SEARCH = 0x10, // search for devices
INFO = 0x011, // information about ourselves
// security
RSA_PUBLIC_KEY = 0x20, // sharing asymmetric public key
AES_SECRET_KEY = 0x21, // sharing symmetric private key
// functionality
AUDIO = 0x30, // play a sound at a given time
};
}

View file

@ -1,10 +1,7 @@
#include <mpg123.h>
#include <portaudio.h>
#include <stdexcept>
#include <thread>
#include "Client.hpp"
#include "Server.hpp"
#include "EventManager.hpp"
int main(int argc, char* argv[]) {
@ -18,15 +15,8 @@ int main(int argc, char* argv[]) {
if (Pa_Initialize() != paNoError)
throw std::runtime_error("Could not initialize PortAudio.");
// start the client and server
Server server;
Client client;
std::thread serverThread(&Server::loop, &server);
std::thread clientThread(&Client::loop, &client);
serverThread.join();
clientThread.join();
EventManager eventManager;
eventManager.loop();
// terminate the libraries
Pa_Terminate();

View file

@ -4,15 +4,21 @@
#include <cstdint>
namespace drp::packet {
struct AudioPacket {
// scheduling
// TODO(Faraphel): use a more "fixed" size format ?
std::chrono::time_point<std::chrono::high_resolution_clock> timePlay;
// audio settings
std::uint8_t channels;
std::uint32_t sampleFormat;
std::uint32_t sampleRate;
std::uint8_t channels {};
std::uint32_t sampleFormat {};
std::uint32_t sampleRate {};
// content
std::uint16_t contentSize;
std::array<std::uint8_t, 65280> content;
std::uint16_t contentSize {};
std::array<std::uint8_t, 65280> content {};
};
}

View file

@ -0,0 +1,90 @@
#include "GenericPacket.hpp"
#include <stdexcept>
namespace drp::packet {
/*
GenericPacketContent decryptPacketContentAes(const GenericPacket& packet) {
GenericPacketContent decryptedPacketContent {};
const auto& [key, iv] = keysAes[serverAddress];
EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new();
if (EVP_DecryptInit_ex(
ctx,
EVP_aes_256_cbc(),
nullptr,
key,
iv
) != 1)
throw std::runtime_error("[Client] Could not initialize the EVP_CIPHER_CTX.");
int packetContentLength;
if (EVP_DecryptUpdate(
ctx,
reinterpret_cast<std::uint8_t*>(&decryptedPacketContent),
&packetContentLength,
reinterpret_cast<const std::uint8_t*>(&packet.encryptedContent),
sizeof(packet)
) != 1)
throw std::runtime_error("[Client] Could not encrypt the plaintext.");
if (EVP_DecryptFinal_ex(
ctx,
reinterpret_cast<std::uint8_t*>(&decryptedPacketContent + packetContentLength),
&packetContentLength
) != 1)
throw std::runtime_error("[Client] Could not decrypt the final plaintext.");
EVP_CIPHER_CTX_free(ctx);
return decryptedPacketContent;
}
*/
GenericPacketContent GenericPacket::getContent() const {
// TODO(Faraphel): implement RSA and AES
// additional "context" argument to hold cryptographic keys ?
switch (static_cast<SecurityMode>(this->securityMode)) {
case SecurityMode::PLAIN:
return this->_content;
case SecurityMode::AES:
// return decryptPacketContentAes(packet);
throw std::runtime_error("Not implemented.");
case SecurityMode::RSA:
throw std::runtime_error("Not implemented.");
default:
throw std::runtime_error("Unsupported security mode.");
}
}
void GenericPacket::setContent(const GenericPacketContent &content) {
// TODO(Faraphel): implement RSA and AES
switch (static_cast<SecurityMode>(this->securityMode)) {
case SecurityMode::PLAIN:
this->_content = content;
return;
case SecurityMode::AES:
throw std::runtime_error("Not implemented.");
case SecurityMode::RSA:
throw std::runtime_error("Not implemented.");
default:
throw std::runtime_error("Unsupported security mode.");
}
}
}

View file

@ -0,0 +1,49 @@
#pragma once
#include <array>
#include <cstdint>
namespace drp::packet {
enum class SecurityMode {
PLAIN = 0x00,
AES = 0x01,
RSA = 0x02,
};
// the maximum data length
// a packet can't be larger than 65565 (uint16 max)
// reserve some space for metadata and settings
constexpr std::uint16_t dataLength = 65504;
/**
* The content of a generic packet.
* @param eventType the type of event that the packet want to trigger.
* @param data the data of the event.
*/
struct GenericPacketContent {
std::uint8_t eventType;
std::array<std::uint8_t, dataLength> data;
};
/**
* A generic packet that can be transmitted through the network.
* @param channel the channel of the packet. Two system can be created inside a same network by using different
* channels value. "0" is used for "broadcast" message across networks.
* @param securityMode the type of security used in the packet.
* @param _content the content of the packet. It is encrypted accordingly to the securityMode.
*/
struct GenericPacket {
std::uint8_t channel;
std::uint8_t securityMode;
GenericPacketContent _content;
[[nodiscard]] GenericPacketContent getContent() const;
void setContent(const GenericPacketContent& content);
};
}

1
source/tasks/README.md Normal file
View file

@ -0,0 +1 @@
This directory contains the code describing how should the machine send event to its peers.

View file

@ -0,0 +1,14 @@
#pragma once
namespace drp::task {
class BaseTask {
public:
virtual ~BaseTask() = default;
virtual void handle() = 0;
};
}

View file

@ -0,0 +1,14 @@
#include "ClientTask.hpp"
namespace drp::task {
void ClientTask::handle() {
// TODO(Faraphel): connect to an ntp server
// TODO(Faraphel): check if the server is still reachable.
// if connection lost, go back to undefined mode.
}
}

View file

@ -0,0 +1,15 @@
#pragma once
#include "../base/BaseTask.hpp"
namespace drp::task {
class ClientTask : public BaseTask {
public:
void handle() override;
};
}

View file

@ -1,4 +1,4 @@
#include "Server.hpp"
#include "ServerTask.hpp"
#include <iostream>
#include <cstdint>
@ -10,11 +10,11 @@
#include <sys/socket.h>
#include <vector>
#include "packets/AudioPacket.hpp"
#include "utils/audio.hpp"
namespace drp::task {
Server::Server() {
ServerTask::ServerTask() {
this->channels = 0;
this->encoding = 0;
this->sampleRate = 0;
@ -44,13 +44,15 @@ Server::Server() {
throw std::runtime_error("[Server] Could not get the format of the file.");
}
Server::~Server() {
ServerTask::~ServerTask() {
// delete the mpg123 handle
mpg123_close(this->mpgHandle);
mpg123_delete(this->mpgHandle);
}
void Server::loop() const {
void ServerTask::loop() const {
// TODO(Faraphel): create a chrony server
// get the broadcast address
addrinfo broadcastHints {};
broadcastHints.ai_family = AF_INET6;
@ -125,3 +127,6 @@ void Server::loop() const {
// free the server address
freeaddrinfo(broadcastInfo);
}
}

View file

@ -2,19 +2,19 @@
#include <mpg123.h>
namespace drp::task {
/**
* the audio Server.
* Read and broadcast audio data.
*/
class Server {
class ServerTask : public BaseTask {
public:
explicit Server();
~Server();
explicit ServerTask();
~ServerTask();
/**
* Indefinitely read and broadcast audio data.
*/
void loop() const;
void handle() override;
private:
mpg123_handle* mpgHandle;
@ -23,3 +23,6 @@ private:
int channels;
int encoding;
};
}

14
source/tasks/types.hpp Normal file
View file

@ -0,0 +1,14 @@
#pragma once
namespace drp::task {
enum class TaskType {
UNDEFINED = 0x00,
CLIENT = 0x01,
SERVER = 0x02,
};
}

View file

@ -0,0 +1,71 @@
#include "UndefinedTask.hpp"
#include "../types.hpp"
#include <chrono>
#include <cstring>
#include <iostream>
#include <sys/socket.h>
#include "../../events/types.hpp"
#include "../../packets/base/GenericPacket.hpp"
namespace drp::task {
void UndefinedTask::handle() {
// TODO(Faraphel): If .status is UNDEFINED, look for a server.
// if alone, become a server (if can emit).
// if everyone is UNKNOWN, elect a server (easiest to join / highest mac address, etc...)
// if a server is found, become a client.
// check if no more peers have been found.
if (
std::chrono::high_resolution_clock::now() - this->peers.getModificationTime() >
std::chrono::milliseconds(5000)
) {
// verify if there are peers
if (this->peers.empty()) {
// if we are alone, become the server
// TODO(Faraphel): only apply if we are capable of emitting !
this->status = TaskType::SERVER;
return;
}
// search for a server in the peers
const auto& server = std::find_if(
this->peers.begin(),
this->peers.end(),
[&](const Peer& peer) { return peer.status == TaskType::SERVER; }
);
if (server != this->peers.end()) {
// if a server have been found, use it
this->server = *server;
this->status = TaskType::CLIENT;
return;
}
// TODO(Faraphel): elect a server among those capable of emitting.
// send the average ping of all the machine in the information packet ?
// add an additionnal random value if equals ?
}
// prepare a search message
packet.channel = 0;
packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN);
packet._content.eventType = static_cast<std::uint8_t>(event::EventType::SEARCH);
// send the search message
if (sendto(
this->eventSocket,
&packet,
sizeof(packet),
0,
destinationInfo->ai_addr,
destinationInfo->ai_addrlen
) == -1)
std::cerr << "[Sender] Could not send search event: " << strerror(errno) << std::endl;
}
}

View file

@ -0,0 +1,13 @@
#pragma once
#include "../base/BaseTask.hpp"
namespace drp::task {
class UndefinedTask : public BaseTask {
void handle() override;
};
}

30
source/utils/StatList.cpp Normal file
View file

@ -0,0 +1,30 @@
#include "StatList.hpp"
template<class T>
StatList<T>::StatList() {
this->updateModificationTime();
}
template<class T>
void StatList<T>::push_back(const T &value) {
std::list<T>::push_back(value);
this->updateModificationTime();
}
template<class T>
void StatList<T>::pop_back() {
std::list<T>::pop_back();
this->updateModificationTime();
}
template<class T>
std::chrono::time_point<std::chrono::high_resolution_clock> StatList<T>::getModificationTime() const {
return this->modificationTime;
}
template<class T>
void StatList<T>::updateModificationTime() {
this->modificationTime = std::chrono::high_resolution_clock::now();
}

24
source/utils/StatList.hpp Normal file
View file

@ -0,0 +1,24 @@
#pragma once
#include <chrono>
#include <list>
/**
* A simple list with additional stat data, such as the date of the latest modification
* @tparam T the type of elements in the list
*/
template<class T>
class StatList : public std::list<T> {
public:
StatList();
void push_back(const T& value);
void pop_back();
[[nodiscard]] std::chrono::time_point<std::chrono::high_resolution_clock> getModificationTime() const;
private:
void updateModificationTime();
std::chrono::time_point<std::chrono::high_resolution_clock> modificationTime;
};