diff --git a/CMakeLists.txt b/CMakeLists.txt index ca210f9..d880c34 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,8 @@ add_executable(M2-PT-DRP source/tasks/client/ClientTask.cpp source/tasks/client/ClientTask.hpp source/Context.hpp + source/utils/CacheMap.cpp + source/utils/CacheMap.hpp ) target_include_directories(M2-PT-DRP PRIVATE ${MPG123_INCLUDE_DIRS} diff --git a/README.md b/README.md new file mode 100644 index 0000000..5fbc7ad --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +## Dependencies + +apt +``` +sudo apt install cmake pkg-config build-essential libmpg123-dev libssl-dev portaudio19-dev +``` diff --git a/source/Context.hpp b/source/Context.hpp index 05d3b13..6ac1285 100644 --- a/source/Context.hpp +++ b/source/Context.hpp @@ -1,5 +1,7 @@ #pragma once +#include +#include #include "RemotePeer.hpp" #include "utils/StatList.hpp" @@ -7,6 +9,7 @@ struct Context { int socket = -1; + addrinfo* broadcastAddressInfo = nullptr; drp::util::StatList> remotePeers {}; std::shared_ptr server = nullptr; diff --git a/source/EventManager.cpp b/source/EventManager.cpp index 4fd4bae..691e44a 100644 --- a/source/EventManager.cpp +++ b/source/EventManager.cpp @@ -38,7 +38,6 @@ EventManager::EventManager() { }; // hints for the communication - // TODO: should be merged with the two others addressHints addrinfo addressHints {}; addressHints.ai_family = AF_INET; // TODO: AF_INET6 addressHints.ai_socktype = SOCK_DGRAM; @@ -53,6 +52,17 @@ EventManager::EventManager() { if (this->context.socket < 0) throw std::runtime_error("[Receiver] Could not create the socket: " + std::string(strerror(errno))); + + // get the information for the broadcast local-link address + // TODO(Faraphel): ip / port as argument ? + if(const int error = getaddrinfo( + "localhost", // TODO: ff02::1 + "5650", + &addressHints, + &context.broadcastAddressInfo + ) != 0) + throw std::runtime_error("[Sender] Could not get the address: " + std::string(gai_strerror(error))); + } @@ -63,6 +73,8 @@ void EventManager::loop() { this->senderThread.join(); this->receiverThread.join(); + + freeaddrinfo(this->context.broadcastAddressInfo); } @@ -96,7 +108,7 @@ void EventManager::loopSender() { } // ask the task class to handle the task - task->handle(); + task->handle(this->context); // wait a second // TODO(Faraphel): might be moved to the tasks directly ? what if they want a lower cooldown ? diff --git a/source/EventManager.hpp b/source/EventManager.hpp index 3a17415..cfe4479 100644 --- a/source/EventManager.hpp +++ b/source/EventManager.hpp @@ -1,17 +1,14 @@ #pragma once -#include #include -#include +#include #include #include "Context.hpp" -#include "RemotePeer.hpp" #include "events/types.hpp" #include "events/base/BaseEvent.hpp" #include "tasks/types.hpp" #include "tasks/base/BaseTask.hpp" -#include "utils/StatList.hpp" class EventManager { diff --git a/source/RemotePeer.hpp b/source/RemotePeer.hpp index 977ad06..74bf839 100644 --- a/source/RemotePeer.hpp +++ b/source/RemotePeer.hpp @@ -12,6 +12,8 @@ struct Peer { bool serverEnabled = false; drp::task::TaskType status = drp::task::TaskType::UNDEFINED; uint8_t channel = 0; + + std::chrono::high_resolution_clock::duration latencyAverage = std::chrono::high_resolution_clock::duration::max(); }; @@ -22,6 +24,7 @@ struct RemotePeer { // communication sockaddr address {}; socklen_t addressLength = 0; + std::chrono::high_resolution_clock::duration latency = std::chrono::high_resolution_clock::duration::max(); // information Peer information; diff --git a/source/events/audio/AudioEvent.cpp b/source/events/audio/AudioEvent.cpp index 5f97dbe..2fc81f5 100644 --- a/source/events/audio/AudioEvent.cpp +++ b/source/events/audio/AudioEvent.cpp @@ -98,7 +98,8 @@ void AudioEvent::loopPlay() { // wait until it must be played std::this_thread::sleep_until(audioPacket.timePlay); - std::cout << "[Client] Playing: " << audioPacket.timePlay << std::endl; + auto cTimePlay = std::chrono::high_resolution_clock::to_time_t(audioPacket.timePlay); + std::cout << "[Client] Playing: " << std::ctime(&cTimePlay) << std::endl; // immediately stop playing music // this avoids an offset created if this client's clock is too ahead of the others diff --git a/source/events/info/InfoEvent.cpp b/source/events/info/InfoEvent.cpp index 6d839ed..85a6478 100644 --- a/source/events/info/InfoEvent.cpp +++ b/source/events/info/InfoEvent.cpp @@ -1,9 +1,8 @@ #include "InfoEvent.hpp" +#include #include -#include "../../tasks/types.hpp" - namespace drp::event { @@ -16,11 +15,13 @@ void InfoEvent::handle( ) { const auto remotePeer = std::make_shared(); + // TODO(Faraphel): first check if the peer is already in the list + // do not update the time of the list if yes ! + + // save the remote peer information remotePeer->address = *reinterpret_cast(&fromAddress); remotePeer->addressLength = fromAddressLength; - - // TODO(Faraphel): convert the memory from content to Peer. - remotePeer->information = content; + std::memcpy(&remotePeer->information, &content, sizeof(Peer)); // save it in the peers list context.remotePeers.push_back(remotePeer); diff --git a/source/packets/audio/AudioPacketData.hpp b/source/packets/audio/AudioPacketData.hpp index b354d8a..d2beee9 100644 --- a/source/packets/audio/AudioPacketData.hpp +++ b/source/packets/audio/AudioPacketData.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include diff --git a/source/tasks/README.md b/source/tasks/README.md index 87c0324..cdf1c27 100644 --- a/source/tasks/README.md +++ b/source/tasks/README.md @@ -1 +1,2 @@ This directory contains the code describing how should the machine send event to its peers. +TODO(Faraphel): rename "roles" ? diff --git a/source/tasks/base/BaseTask.hpp b/source/tasks/base/BaseTask.hpp index bea4a37..f4d5688 100644 --- a/source/tasks/base/BaseTask.hpp +++ b/source/tasks/base/BaseTask.hpp @@ -1,5 +1,8 @@ #pragma once +#include "../../Context.hpp" + + namespace drp::task { @@ -7,7 +10,7 @@ class BaseTask { public: virtual ~BaseTask() = default; - virtual void handle() = 0; + virtual void handle(Context& context) = 0; }; diff --git a/source/tasks/client/ClientTask.cpp b/source/tasks/client/ClientTask.cpp index c59bef7..0e71ad4 100644 --- a/source/tasks/client/ClientTask.cpp +++ b/source/tasks/client/ClientTask.cpp @@ -1,9 +1,10 @@ #include "ClientTask.hpp" + namespace drp::task { -void ClientTask::handle() { +void ClientTask::handle(Context& context) { // TODO(Faraphel): connect to an ntp server // TODO(Faraphel): check if the server is still reachable. diff --git a/source/tasks/client/ClientTask.hpp b/source/tasks/client/ClientTask.hpp index d04c3f4..28c606e 100644 --- a/source/tasks/client/ClientTask.hpp +++ b/source/tasks/client/ClientTask.hpp @@ -8,7 +8,7 @@ namespace drp::task { class ClientTask : public BaseTask { public: - void handle() override; + void handle(Context& context) override; }; diff --git a/source/tasks/server/ServerTask.cpp b/source/tasks/server/ServerTask.cpp index 768c00b..964790b 100644 --- a/source/tasks/server/ServerTask.cpp +++ b/source/tasks/server/ServerTask.cpp @@ -32,7 +32,6 @@ ServerTask::ServerTask() { // TODO(Faraphel): mp3 file as argument if (mpg123_open( this->mpgHandle, - // "./assets/Caravan Palace - Wonderland.mp3" "./assets/Queen - Another One Bites the Dust.mp3" ) != MPG123_OK) throw std::runtime_error("[Server] Could not open file."); @@ -53,82 +52,58 @@ ServerTask::~ServerTask() { mpg123_delete(this->mpgHandle); } -void ServerTask::loop() const { +void ServerTask::handle(Context& context) { // TODO(Faraphel): create a chrony server - // get the broadcast address - addrinfo broadcastHints {}; - broadcastHints.ai_family = AF_INET6; - broadcastHints.ai_socktype = SOCK_DGRAM; - broadcastHints.ai_protocol = IPPROTO_UDP; - - // TODO(Faraphel): ip / port as argument - addrinfo *broadcastInfo; - if(const int error = getaddrinfo( - "::1", - "5650", - &broadcastHints, - &broadcastInfo - ) != 0) - throw std::runtime_error("[Server] Could not get the address: " + std::string(gai_strerror(error))); - - const int broadcastSocket = socket( - broadcastInfo->ai_family, - broadcastInfo->ai_socktype, - broadcastInfo->ai_protocol - ); - if (broadcastSocket == -1) - throw std::runtime_error("[Server] Could not create the socket: " + std::string(gai_strerror(errno))); - // read the file packet::AudioPacketData audioPacket; std::size_t done; - while (mpg123_read( + if (mpg123_read( this->mpgHandle, &audioPacket.content, std::size(audioPacket.content), &done - ) == MPG123_OK) { - // set the target time - // TODO(Faraphel): dynamically change this delay to be the lowest possible - audioPacket.timePlay = - std::chrono::high_resolution_clock::now() + - std::chrono::milliseconds(5000); - - // set the audio settings - audioPacket.channels = this->channels; - audioPacket.sampleFormat = util::encoding_mpg123_to_PulseAudio(this->encoding); - audioPacket.sampleRate = this->sampleRate; - - // set the size of the content - audioPacket.contentSize = done; - - // broadcast the audio data - if (sendto( - broadcastSocket, - &audioPacket, - sizeof(audioPacket), - 0, - broadcastInfo->ai_addr, - broadcastInfo->ai_addrlen - ) == -1) { - std::cerr << "[Server] Could not send audio packet: " << strerror(errno) << std::endl; - continue; - } - - std::cout << "[Server] Sent: " << done << " bytes" << std::endl; - - // wait for the duration of the audio chunk - std::this_thread::sleep_for(std::chrono::milliseconds(static_cast( - (1 / static_cast(this->sampleRate * this->channels * mpg123_encsize(this->encoding))) * - 1000 * - static_cast(done) - ))); + ) != MPG123_OK) { + std::cerr << "[Server] Could not read audio data from file." << std::endl; + return; } - // free the server address - freeaddrinfo(broadcastInfo); + // set the target time + // TODO(Faraphel): dynamically change this delay to be the lowest possible + audioPacket.timePlay = + std::chrono::high_resolution_clock::now() + + std::chrono::milliseconds(5000); + + // set the audio settings + audioPacket.channels = this->channels; + audioPacket.sampleFormat = util::encoding_mpg123_to_PulseAudio(this->encoding); + audioPacket.sampleRate = this->sampleRate; + + // set the size of the content + audioPacket.contentSize = done; + + // broadcast the audio data + if (sendto( + context.socket, + &audioPacket, + sizeof(audioPacket), + 0, + context.broadcastAddressInfo->ai_addr, + context.broadcastAddressInfo->ai_addrlen + ) == -1) { + std::cerr << "[Server] Could not send audio packet: " << strerror(errno) << std::endl; + return; + } + + std::cout << "[Server] Sent: " << done << " bytes" << std::endl; + + // wait for the duration of the audio chunk + std::this_thread::sleep_for(std::chrono::milliseconds(static_cast( + (1 / static_cast(this->sampleRate * this->channels * mpg123_encsize(this->encoding))) * + 1000 * + static_cast(done) + ))); } diff --git a/source/tasks/server/ServerTask.hpp b/source/tasks/server/ServerTask.hpp index a2777b6..a9c9251 100644 --- a/source/tasks/server/ServerTask.hpp +++ b/source/tasks/server/ServerTask.hpp @@ -1,5 +1,6 @@ #pragma once #include +#include #include "../base/BaseTask.hpp" @@ -16,11 +17,10 @@ public: explicit ServerTask(); ~ServerTask(); - void handle() override; + void handle(Context& context) override; private: mpg123_handle* mpgHandle; - long sampleRate; int channels; int encoding; diff --git a/source/tasks/undefined/UndefinedTask.cpp b/source/tasks/undefined/UndefinedTask.cpp index c4caa62..2978aa4 100644 --- a/source/tasks/undefined/UndefinedTask.cpp +++ b/source/tasks/undefined/UndefinedTask.cpp @@ -6,6 +6,7 @@ #include #include +#include "../../Context.hpp" #include "../../events/types.hpp" #include "../../packets/base/GenericPacket.hpp" @@ -13,7 +14,7 @@ namespace drp::task { -void UndefinedTask::handle() { +void UndefinedTask::handle(Context& context) { // TODO(Faraphel): If .status is UNDEFINED, look for a server. // if alone, become a server (if can emit). // if everyone is UNKNOWN, elect a server (easiest to join / highest mac address, etc...) @@ -21,54 +22,63 @@ void UndefinedTask::handle() { // check if no more peers have been found. if ( - std::chrono::high_resolution_clock::now() - this->peers.getModificationTime() > + std::chrono::high_resolution_clock::now() - context.remotePeers.getModificationTime() > std::chrono::milliseconds(5000) ) { // verify if there are peers - if (this->peers.empty()) { + if (context.remotePeers.empty()) { // if we are alone in the network // check if we are capable of being a server - if (!canEmit()) + if (!context.me.serverEnabled) return; // set ourselves as the server - this->status = TaskType::SERVER; + context.server = nullptr; + context.me.status = TaskType::SERVER; return; } // search for a server among the peers const auto& server = std::find_if( - this->peers.begin(), - this->peers.end(), - [&](const Peer& peer) { return peer.status == TaskType::SERVER; } + context.remotePeers.begin(), + context.remotePeers.end(), + [&](const std::shared_ptr& peer) { return peer->status == TaskType::SERVER; } ); - if (server != this->peers.end()) { + if (server != context.remotePeers.end()) { // if a server have been found, use it - this->server = *server; - this->status = TaskType::CLIENT; + context.server = *server; + context.me.status = TaskType::CLIENT; return; } // TODO(Faraphel): elect a server among those capable of emitting. - // send the average ping of all the machine in the information packet ? - // add an additionnal random value if equals ? } // prepare a search message packet::GenericPacket packet {}; + packet::GenericPacketContent packetContent {}; + packet.channel = 0; packet.securityMode = static_cast(packet::SecurityMode::PLAIN); - packet._content.eventType = static_cast(event::EventType::SEARCH); + + packetContent.eventType = static_cast(event::EventType::SEARCH); + // TODO(Faraphel): generate a random broadcast code and put it in the packet. + // when sending the response to this message, include this broadcast code. + // it will allow us to determinate an estimation of the ping of the machine + // will help when electing a server. + // Bonus: when electing a server, define a random value in the constructor to send + // to every election, allowing for distinction of machines + packet.setContent(packetContent); // send the search message if (sendto( - this->eventSocket, + context.socket, &packet, sizeof(packet), 0, - destinationInfo->ai_addr, - destinationInfo->ai_addrlen + context.broadcastAddressInfo->ai_addr, + context.broadcastAddressInfo->ai_addrlen ) == -1) std::cerr << "[Sender] Could not send search event: " << strerror(errno) << std::endl; } diff --git a/source/tasks/undefined/UndefinedTask.hpp b/source/tasks/undefined/UndefinedTask.hpp index 802df33..9305a53 100644 --- a/source/tasks/undefined/UndefinedTask.hpp +++ b/source/tasks/undefined/UndefinedTask.hpp @@ -1,12 +1,28 @@ #pragma once +#include + #include "../base/BaseTask.hpp" namespace drp::task { -class UndefinedTask : public BaseTask { - void handle() override; +class UndefinedTask final : public BaseTask { +public: + void handle(Context& context) override; + +private: + std::uniform_int_distribution( + std::numeric_limits::min(), + std::numeric_limits::max() + ) broadcastCodeGenerator; + + util::CacheMap< + uint32_t, + std::chrono::high_resolution_clock::time_point, + 128 + > broadcastCode; + }; diff --git a/source/utils/CacheMap.cpp b/source/utils/CacheMap.cpp new file mode 100644 index 0000000..c408636 --- /dev/null +++ b/source/utils/CacheMap.cpp @@ -0,0 +1 @@ +#include "CacheMap.hpp" diff --git a/source/utils/CacheMap.hpp b/source/utils/CacheMap.hpp new file mode 100644 index 0000000..9d980ea --- /dev/null +++ b/source/utils/CacheMap.hpp @@ -0,0 +1,35 @@ +#pragma once +#include +#include + + +namespace drp::util { + + +template> +class CacheMap : std::map { +public: + void cache() { + // check if the limit have been reached. + if (this->size() <= limit) + return; + + // apply the comparator on all the value to find the minimum. + auto minimum = this->begin(); + for (const auto& it : this) + if (Comparator()(it.second, minimum)) + minimum = it; + + // delete this lowest value. + this->erase(minimum); + } + + Value& operator[](const Key& key) { + const auto& value = std::map::operator[](key); + this->cache(); + return value; + } +}; + + +} diff --git a/source/utils/StatList.hpp b/source/utils/StatList.hpp index b21b0a8..43c97da 100644 --- a/source/utils/StatList.hpp +++ b/source/utils/StatList.hpp @@ -9,14 +9,14 @@ namespace drp::util { /** * A simple list with additional stat data, such as the date of the latest modification - * @tparam T the type of elements in the list + * @tparam Value the type of elements in the list */ -template -class StatList : public std::list { +template +class StatList : public std::list { public: StatList(); - void push_back(const T& value); + void push_back(const Value& value); void pop_back(); [[nodiscard]] std::chrono::time_point getModificationTime() const;