fixed missing include, added context to tasks.

This commit is contained in:
study-faraphel 2024-11-05 14:58:31 +01:00
parent 218ee71118
commit 93f8701ac5
20 changed files with 173 additions and 105 deletions

View file

@ -46,6 +46,8 @@ add_executable(M2-PT-DRP
source/tasks/client/ClientTask.cpp source/tasks/client/ClientTask.cpp
source/tasks/client/ClientTask.hpp source/tasks/client/ClientTask.hpp
source/Context.hpp source/Context.hpp
source/utils/CacheMap.cpp
source/utils/CacheMap.hpp
) )
target_include_directories(M2-PT-DRP PRIVATE target_include_directories(M2-PT-DRP PRIVATE
${MPG123_INCLUDE_DIRS} ${MPG123_INCLUDE_DIRS}

6
README.md Normal file
View file

@ -0,0 +1,6 @@
## Dependencies
apt
```
sudo apt install cmake pkg-config build-essential libmpg123-dev libssl-dev portaudio19-dev
```

View file

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <memory>
#include <netdb.h>
#include "RemotePeer.hpp" #include "RemotePeer.hpp"
#include "utils/StatList.hpp" #include "utils/StatList.hpp"
@ -7,6 +9,7 @@
struct Context { struct Context {
int socket = -1; int socket = -1;
addrinfo* broadcastAddressInfo = nullptr;
drp::util::StatList<std::shared_ptr<RemotePeer>> remotePeers {}; drp::util::StatList<std::shared_ptr<RemotePeer>> remotePeers {};
std::shared_ptr<RemotePeer> server = nullptr; std::shared_ptr<RemotePeer> server = nullptr;

View file

@ -38,7 +38,6 @@ EventManager::EventManager() {
}; };
// hints for the communication // hints for the communication
// TODO: should be merged with the two others addressHints
addrinfo addressHints {}; addrinfo addressHints {};
addressHints.ai_family = AF_INET; // TODO: AF_INET6 addressHints.ai_family = AF_INET; // TODO: AF_INET6
addressHints.ai_socktype = SOCK_DGRAM; addressHints.ai_socktype = SOCK_DGRAM;
@ -53,6 +52,17 @@ EventManager::EventManager() {
if (this->context.socket < 0) if (this->context.socket < 0)
throw std::runtime_error("[Receiver] Could not create the socket: " + std::string(strerror(errno))); throw std::runtime_error("[Receiver] Could not create the socket: " + std::string(strerror(errno)));
// get the information for the broadcast local-link address
// TODO(Faraphel): ip / port as argument ?
if(const int error = getaddrinfo(
"localhost", // TODO: ff02::1
"5650",
&addressHints,
&context.broadcastAddressInfo
) != 0)
throw std::runtime_error("[Sender] Could not get the address: " + std::string(gai_strerror(error)));
} }
@ -63,6 +73,8 @@ void EventManager::loop() {
this->senderThread.join(); this->senderThread.join();
this->receiverThread.join(); this->receiverThread.join();
freeaddrinfo(this->context.broadcastAddressInfo);
} }
@ -96,7 +108,7 @@ void EventManager::loopSender() {
} }
// ask the task class to handle the task // ask the task class to handle the task
task->handle(); task->handle(this->context);
// wait a second // wait a second
// TODO(Faraphel): might be moved to the tasks directly ? what if they want a lower cooldown ? // TODO(Faraphel): might be moved to the tasks directly ? what if they want a lower cooldown ?

View file

@ -1,17 +1,14 @@
#pragma once #pragma once
#include <cstdint>
#include <map> #include <map>
#include <netdb.h> #include <memory>
#include <thread> #include <thread>
#include "Context.hpp" #include "Context.hpp"
#include "RemotePeer.hpp"
#include "events/types.hpp" #include "events/types.hpp"
#include "events/base/BaseEvent.hpp" #include "events/base/BaseEvent.hpp"
#include "tasks/types.hpp" #include "tasks/types.hpp"
#include "tasks/base/BaseTask.hpp" #include "tasks/base/BaseTask.hpp"
#include "utils/StatList.hpp"
class EventManager { class EventManager {

View file

@ -12,6 +12,8 @@ struct Peer {
bool serverEnabled = false; bool serverEnabled = false;
drp::task::TaskType status = drp::task::TaskType::UNDEFINED; drp::task::TaskType status = drp::task::TaskType::UNDEFINED;
uint8_t channel = 0; uint8_t channel = 0;
std::chrono::high_resolution_clock::duration latencyAverage = std::chrono::high_resolution_clock::duration::max();
}; };
@ -22,6 +24,7 @@ struct RemotePeer {
// communication // communication
sockaddr address {}; sockaddr address {};
socklen_t addressLength = 0; socklen_t addressLength = 0;
std::chrono::high_resolution_clock::duration latency = std::chrono::high_resolution_clock::duration::max();
// information // information
Peer information; Peer information;

View file

@ -98,7 +98,8 @@ void AudioEvent::loopPlay() {
// wait until it must be played // wait until it must be played
std::this_thread::sleep_until(audioPacket.timePlay); std::this_thread::sleep_until(audioPacket.timePlay);
std::cout << "[Client] Playing: " << audioPacket.timePlay << std::endl; auto cTimePlay = std::chrono::high_resolution_clock::to_time_t(audioPacket.timePlay);
std::cout << "[Client] Playing: " << std::ctime(&cTimePlay) << std::endl;
// immediately stop playing music // immediately stop playing music
// this avoids an offset created if this client's clock is too ahead of the others // this avoids an offset created if this client's clock is too ahead of the others

View file

@ -1,9 +1,8 @@
#include "InfoEvent.hpp" #include "InfoEvent.hpp"
#include <cstring>
#include <sys/socket.h> #include <sys/socket.h>
#include "../../tasks/types.hpp"
namespace drp::event { namespace drp::event {
@ -16,11 +15,13 @@ void InfoEvent::handle(
) { ) {
const auto remotePeer = std::make_shared<RemotePeer>(); const auto remotePeer = std::make_shared<RemotePeer>();
// TODO(Faraphel): first check if the peer is already in the list
// do not update the time of the list if yes !
// save the remote peer information
remotePeer->address = *reinterpret_cast<sockaddr*>(&fromAddress); remotePeer->address = *reinterpret_cast<sockaddr*>(&fromAddress);
remotePeer->addressLength = fromAddressLength; remotePeer->addressLength = fromAddressLength;
std::memcpy(&remotePeer->information, &content, sizeof(Peer));
// TODO(Faraphel): convert the memory from content to Peer.
remotePeer->information = content;
// save it in the peers list // save it in the peers list
context.remotePeers.push_back(remotePeer); context.remotePeers.push_back(remotePeer);

View file

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <array>
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>

View file

@ -1 +1,2 @@
This directory contains the code describing how should the machine send event to its peers. This directory contains the code describing how should the machine send event to its peers.
TODO(Faraphel): rename "roles" ?

View file

@ -1,5 +1,8 @@
#pragma once #pragma once
#include "../../Context.hpp"
namespace drp::task { namespace drp::task {
@ -7,7 +10,7 @@ class BaseTask {
public: public:
virtual ~BaseTask() = default; virtual ~BaseTask() = default;
virtual void handle() = 0; virtual void handle(Context& context) = 0;
}; };

View file

@ -1,9 +1,10 @@
#include "ClientTask.hpp" #include "ClientTask.hpp"
namespace drp::task { namespace drp::task {
void ClientTask::handle() { void ClientTask::handle(Context& context) {
// TODO(Faraphel): connect to an ntp server // TODO(Faraphel): connect to an ntp server
// TODO(Faraphel): check if the server is still reachable. // TODO(Faraphel): check if the server is still reachable.

View file

@ -8,7 +8,7 @@ namespace drp::task {
class ClientTask : public BaseTask { class ClientTask : public BaseTask {
public: public:
void handle() override; void handle(Context& context) override;
}; };

View file

@ -32,7 +32,6 @@ ServerTask::ServerTask() {
// TODO(Faraphel): mp3 file as argument // TODO(Faraphel): mp3 file as argument
if (mpg123_open( if (mpg123_open(
this->mpgHandle, this->mpgHandle,
// "./assets/Caravan Palace - Wonderland.mp3"
"./assets/Queen - Another One Bites the Dust.mp3" "./assets/Queen - Another One Bites the Dust.mp3"
) != MPG123_OK) ) != MPG123_OK)
throw std::runtime_error("[Server] Could not open file."); throw std::runtime_error("[Server] Could not open file.");
@ -53,43 +52,23 @@ ServerTask::~ServerTask() {
mpg123_delete(this->mpgHandle); mpg123_delete(this->mpgHandle);
} }
void ServerTask::loop() const { void ServerTask::handle(Context& context) {
// TODO(Faraphel): create a chrony server // TODO(Faraphel): create a chrony server
// get the broadcast address
addrinfo broadcastHints {};
broadcastHints.ai_family = AF_INET6;
broadcastHints.ai_socktype = SOCK_DGRAM;
broadcastHints.ai_protocol = IPPROTO_UDP;
// TODO(Faraphel): ip / port as argument
addrinfo *broadcastInfo;
if(const int error = getaddrinfo(
"::1",
"5650",
&broadcastHints,
&broadcastInfo
) != 0)
throw std::runtime_error("[Server] Could not get the address: " + std::string(gai_strerror(error)));
const int broadcastSocket = socket(
broadcastInfo->ai_family,
broadcastInfo->ai_socktype,
broadcastInfo->ai_protocol
);
if (broadcastSocket == -1)
throw std::runtime_error("[Server] Could not create the socket: " + std::string(gai_strerror(errno)));
// read the file // read the file
packet::AudioPacketData audioPacket; packet::AudioPacketData audioPacket;
std::size_t done; std::size_t done;
while (mpg123_read( if (mpg123_read(
this->mpgHandle, this->mpgHandle,
&audioPacket.content, &audioPacket.content,
std::size(audioPacket.content), std::size(audioPacket.content),
&done &done
) == MPG123_OK) { ) != MPG123_OK) {
std::cerr << "[Server] Could not read audio data from file." << std::endl;
return;
}
// set the target time // set the target time
// TODO(Faraphel): dynamically change this delay to be the lowest possible // TODO(Faraphel): dynamically change this delay to be the lowest possible
audioPacket.timePlay = audioPacket.timePlay =
@ -106,30 +85,26 @@ void ServerTask::loop() const {
// broadcast the audio data // broadcast the audio data
if (sendto( if (sendto(
broadcastSocket, context.socket,
&audioPacket, &audioPacket,
sizeof(audioPacket), sizeof(audioPacket),
0, 0,
broadcastInfo->ai_addr, context.broadcastAddressInfo->ai_addr,
broadcastInfo->ai_addrlen context.broadcastAddressInfo->ai_addrlen
) == -1) { ) == -1) {
std::cerr << "[Server] Could not send audio packet: " << strerror(errno) << std::endl; std::cerr << "[Server] Could not send audio packet: " << strerror(errno) << std::endl;
continue; return;
} }
std::cout << "[Server] Sent: " << done << " bytes" << std::endl; std::cout << "[Server] Sent: " << done << " bytes" << std::endl;
// wait for the duration of the audio chunk // wait for the duration of the audio chunk
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<uint64_t>( std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<std::uint64_t>(
(1 / static_cast<double>(this->sampleRate * this->channels * mpg123_encsize(this->encoding))) * (1 / static_cast<double>(this->sampleRate * this->channels * mpg123_encsize(this->encoding))) *
1000 * 1000 *
static_cast<double>(done) static_cast<double>(done)
))); )));
} }
// free the server address
freeaddrinfo(broadcastInfo);
}
} }

View file

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <mpg123.h> #include <mpg123.h>
#include <netdb.h>
#include "../base/BaseTask.hpp" #include "../base/BaseTask.hpp"
@ -16,11 +17,10 @@ public:
explicit ServerTask(); explicit ServerTask();
~ServerTask(); ~ServerTask();
void handle() override; void handle(Context& context) override;
private: private:
mpg123_handle* mpgHandle; mpg123_handle* mpgHandle;
long sampleRate; long sampleRate;
int channels; int channels;
int encoding; int encoding;

View file

@ -6,6 +6,7 @@
#include <iostream> #include <iostream>
#include <sys/socket.h> #include <sys/socket.h>
#include "../../Context.hpp"
#include "../../events/types.hpp" #include "../../events/types.hpp"
#include "../../packets/base/GenericPacket.hpp" #include "../../packets/base/GenericPacket.hpp"
@ -13,7 +14,7 @@
namespace drp::task { namespace drp::task {
void UndefinedTask::handle() { void UndefinedTask::handle(Context& context) {
// TODO(Faraphel): If .status is UNDEFINED, look for a server. // TODO(Faraphel): If .status is UNDEFINED, look for a server.
// if alone, become a server (if can emit). // if alone, become a server (if can emit).
// if everyone is UNKNOWN, elect a server (easiest to join / highest mac address, etc...) // if everyone is UNKNOWN, elect a server (easiest to join / highest mac address, etc...)
@ -21,54 +22,63 @@ void UndefinedTask::handle() {
// check if no more peers have been found. // check if no more peers have been found.
if ( if (
std::chrono::high_resolution_clock::now() - this->peers.getModificationTime() > std::chrono::high_resolution_clock::now() - context.remotePeers.getModificationTime() >
std::chrono::milliseconds(5000) std::chrono::milliseconds(5000)
) { ) {
// verify if there are peers // verify if there are peers
if (this->peers.empty()) { if (context.remotePeers.empty()) {
// if we are alone in the network // if we are alone in the network
// check if we are capable of being a server // check if we are capable of being a server
if (!canEmit()) if (!context.me.serverEnabled)
return; return;
// set ourselves as the server // set ourselves as the server
this->status = TaskType::SERVER; context.server = nullptr;
context.me.status = TaskType::SERVER;
return; return;
} }
// search for a server among the peers // search for a server among the peers
const auto& server = std::find_if( const auto& server = std::find_if(
this->peers.begin(), context.remotePeers.begin(),
this->peers.end(), context.remotePeers.end(),
[&](const Peer& peer) { return peer.status == TaskType::SERVER; } [&](const std::shared_ptr<Peer>& peer) { return peer->status == TaskType::SERVER; }
); );
if (server != this->peers.end()) { if (server != context.remotePeers.end()) {
// if a server have been found, use it // if a server have been found, use it
this->server = *server; context.server = *server;
this->status = TaskType::CLIENT; context.me.status = TaskType::CLIENT;
return; return;
} }
// TODO(Faraphel): elect a server among those capable of emitting. // TODO(Faraphel): elect a server among those capable of emitting.
// send the average ping of all the machine in the information packet ?
// add an additionnal random value if equals ?
} }
// prepare a search message // prepare a search message
packet::GenericPacket packet {}; packet::GenericPacket packet {};
packet::GenericPacketContent packetContent {};
packet.channel = 0; packet.channel = 0;
packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN); packet.securityMode = static_cast<std::uint8_t>(packet::SecurityMode::PLAIN);
packet._content.eventType = static_cast<std::uint8_t>(event::EventType::SEARCH);
packetContent.eventType = static_cast<std::uint8_t>(event::EventType::SEARCH);
// TODO(Faraphel): generate a random broadcast code and put it in the packet.
// when sending the response to this message, include this broadcast code.
// it will allow us to determinate an estimation of the ping of the machine
// will help when electing a server.
// Bonus: when electing a server, define a random value in the constructor to send
// to every election, allowing for distinction of machines
packet.setContent(packetContent);
// send the search message // send the search message
if (sendto( if (sendto(
this->eventSocket, context.socket,
&packet, &packet,
sizeof(packet), sizeof(packet),
0, 0,
destinationInfo->ai_addr, context.broadcastAddressInfo->ai_addr,
destinationInfo->ai_addrlen context.broadcastAddressInfo->ai_addrlen
) == -1) ) == -1)
std::cerr << "[Sender] Could not send search event: " << strerror(errno) << std::endl; std::cerr << "[Sender] Could not send search event: " << strerror(errno) << std::endl;
} }

View file

@ -1,12 +1,28 @@
#pragma once #pragma once
#include <map>
#include "../base/BaseTask.hpp" #include "../base/BaseTask.hpp"
namespace drp::task { namespace drp::task {
class UndefinedTask : public BaseTask { class UndefinedTask final : public BaseTask {
void handle() override; public:
void handle(Context& context) override;
private:
std::uniform_int_distribution(
std::numeric_limits<std::uint16_t>::min(),
std::numeric_limits<std::uint16_t>::max()
) broadcastCodeGenerator;
util::CacheMap<
uint32_t,
std::chrono::high_resolution_clock::time_point,
128
> broadcastCode;
}; };

View file

@ -0,0 +1 @@
#include "CacheMap.hpp"

35
source/utils/CacheMap.hpp Normal file
View file

@ -0,0 +1,35 @@
#pragma once
#include <functional>
#include <map>
namespace drp::util {
template<class Key, class Value, int limit, class Comparator = std::less<Value>>
class CacheMap : std::map<Key, Value> {
public:
void cache() {
// check if the limit have been reached.
if (this->size() <= limit)
return;
// apply the comparator on all the value to find the minimum.
auto minimum = this->begin();
for (const auto& it : this)
if (Comparator()(it.second, minimum))
minimum = it;
// delete this lowest value.
this->erase(minimum);
}
Value& operator[](const Key& key) {
const auto& value = std::map<Key, Value>::operator[](key);
this->cache();
return value;
}
};
}

View file

@ -9,14 +9,14 @@ namespace drp::util {
/** /**
* A simple list with additional stat data, such as the date of the latest modification * A simple list with additional stat data, such as the date of the latest modification
* @tparam T the type of elements in the list * @tparam Value the type of elements in the list
*/ */
template<class T> template<class Value>
class StatList : public std::list<T> { class StatList : public std::list<Value> {
public: public:
StatList(); StatList();
void push_back(const T& value); void push_back(const Value& value);
void pop_back(); void pop_back();
[[nodiscard]] std::chrono::time_point<std::chrono::high_resolution_clock> getModificationTime() const; [[nodiscard]] std::chrono::time_point<std::chrono::high_resolution_clock> getModificationTime() const;