added an id system to the peers

This commit is contained in:
study-faraphel 2024-11-08 16:29:11 +01:00
parent bea5704282
commit 724cad320f
14 changed files with 87 additions and 74 deletions

View file

@ -28,7 +28,7 @@ Ce logiciel est distribué tel quel, sans aucune garantie de quelque nature que
## VI. Glossaire
fork : projet se basant sur le code code source d'un logiciel déjà existant.
fork : projet se basant sur le code source d'un logiciel déjà existant.
---

View file

@ -13,6 +13,9 @@ struct Context {
std::shared_ptr<RemotePeer> server = nullptr;
Peer me;
std::map<sockaddr, std::shared_ptr<RemotePeer>> remotePeers {};
std::map<
std::uint32_t,
std::shared_ptr<RemotePeer>
> remotePeers {};
std::chrono::high_resolution_clock::time_point latestPeerDiscovery;
};

View file

@ -8,6 +8,7 @@
#include <netdb.h>
#include <ostream>
#include <thread>
#include <random>
#include <sys/socket.h>
#include "events/types.hpp"
@ -63,6 +64,19 @@ EventManager::EventManager() {
) != 0)
throw std::runtime_error("[Sender] Could not get the address: " + std::string(gai_strerror(error)));
// generate a random identifier for ourselves
std::random_device randomDevice;
std::mt19937 randomGenerator(randomDevice());
std::uniform_int_distribution<std::uint32_t> distribution(
1,
std::numeric_limits<std::uint32_t>::max()
);
this->context.me.id = distribution(randomGenerator);
// TODO(Faraphel): should only be enabled in specific case.
this->context.me.serverEnabled = true;
// define the time of the latest discovery
this->context.latestPeerDiscovery = std::chrono::high_resolution_clock::now();
}
@ -134,7 +148,7 @@ void EventManager::loopReceiver() {
// TODO(Faraphel): port as argument
addrinfo* senderInfo;
if(getaddrinfo(
nullptr, // hostname
"0.0.0.0", // hostname
"5650", // port
&addressHints,
&senderInfo
@ -186,7 +200,7 @@ void EventManager::loopReceiver() {
event->handle(
this->context,
packetContent,
reinterpret_cast<sockaddr*>(&fromAddress),
fromAddress,
fromAddressLength
);
}

View file

@ -10,9 +10,11 @@
* Contains common information about a certain peer.
*/
struct Peer {
std::uint32_t id = 0;
bool serverEnabled = false;
drp::task::TaskType status = drp::task::TaskType::UNDEFINED;
uint8_t channel = 0;
std::uint8_t channel = 0;
std::chrono::high_resolution_clock::duration latencyAverage = std::chrono::high_resolution_clock::duration::max();
};
@ -23,8 +25,9 @@ struct Peer {
*/
struct RemotePeer {
// communication
sockaddr address {};
sockaddr_storage address {};
socklen_t addressLength = 0;
std::chrono::high_resolution_clock::duration latency = std::chrono::high_resolution_clock::duration::max();
// information

View file

@ -32,8 +32,8 @@ AudioEvent::~AudioEvent() {
void AudioEvent::handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
const sockaddr_storage& fromAddress,
const socklen_t fromAddressLength
) {
// get the audio data in the content
packet::AudioPacketData audioData;

View file

@ -23,7 +23,7 @@ public:
void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const sockaddr_storage& fromAddress,
socklen_t fromAddressLength
) override;

View file

@ -13,7 +13,7 @@ public:
virtual void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const sockaddr_storage& fromAddress,
socklen_t fromAddressLength
) = 0;
};

View file

@ -10,17 +10,21 @@ namespace drp::event {
void InfoEvent::handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const sockaddr_storage& fromAddress,
const socklen_t fromAddressLength
) {
// check if the peer address is already in the map
std::shared_ptr<RemotePeer> remotePeer;
auto iterator = context.remotePeers.find(*fromAddress);
// get the peer information
Peer peer;
std::memcpy(&peer, &content, sizeof(Peer));
// check if the peer address is already in the map
const auto iterator = context.remotePeers.find(peer.id);
std::shared_ptr<RemotePeer> remotePeer;
if (iterator == context.remotePeers.end()) {
// if not found, create a new peer
remotePeer = std::make_shared<RemotePeer>();
remotePeer->address = *fromAddress;
remotePeer->address = fromAddress;
remotePeer->addressLength = fromAddressLength;
// update the latest discovery time
context.latestPeerDiscovery = std::chrono::high_resolution_clock::now();
@ -29,13 +33,11 @@ void InfoEvent::handle(
remotePeer = iterator->second;
}
// save the remote peer information
std::memcpy(&remotePeer->information, &content, sizeof(Peer));
// TODO(Faraphel): interpret the timestamp and calculate average ping
// save it in the peers list
context.remotePeers[remotePeer->address] = remotePeer;
remotePeer->information = peer;
context.remotePeers[peer.id] = remotePeer;
}

View file

@ -11,7 +11,7 @@ public:
void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const sockaddr_storage& fromAddress,
socklen_t fromAddressLength
) override;
};

View file

@ -9,8 +9,8 @@ namespace drp::event {
void PongEvent::handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
socklen_t fromAddressLength
const sockaddr_storage& fromAddress,
const socklen_t fromAddressLength
) {
std::cout << "[Receiver] Pong." << std::endl;
}

View file

@ -11,7 +11,7 @@ public:
void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const sockaddr_storage& fromAddress,
socklen_t fromAddressLength
) override;
};

View file

@ -16,7 +16,7 @@ namespace drp {
void event::SearchEvent::handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const sockaddr_storage& fromAddress,
const socklen_t fromAddressLength
) {
packet::GenericPacket packet {};
@ -39,7 +39,7 @@ void event::SearchEvent::handle(
&packet,
sizeof(packet),
0,
fromAddress,
reinterpret_cast<const sockaddr*>(&fromAddress),
fromAddressLength
) == -1) {
std::cerr << "[Receiver] Could not send information: " << strerror(errno) << std::endl;

View file

@ -10,7 +10,7 @@ public:
void handle(
Context& context,
const packet::GenericPacketContent& content,
sockaddr* fromAddress,
const sockaddr_storage& fromAddress,
socklen_t fromAddressLength
) override;
};

View file

@ -4,7 +4,10 @@
#include <chrono>
#include <cstring>
#include <iostream>
#include <numeric>
#include <thread>
#include <bits/ranges_util.h>
#include <pstl/glue_numeric_defs.h>
#include <sys/socket.h>
#include "../../Context.hpp"
@ -17,37 +20,13 @@ namespace drp::task {
void UndefinedTask::handle(Context& context) {
// TODO(Faraphel): If .status is UNDEFINED, look for a server.
// if alone, become a server (if can emit).
// if everyone is UNKNOWN, elect a server (easiest to join / highest mac address, etc...)
// if a server is found, become a client.
// check if no more peers have been found.
if (
std::chrono::high_resolution_clock::now() - context.latestPeerDiscovery >
std::chrono::milliseconds(5000)
) {
// verify if there are peers
// TODO(Faraphel): the map is never empty since there is a least the local client
if (context.remotePeers.empty()) {
// if we are alone in the network
// check if we are capable of being a server
if (!context.me.serverEnabled)
return;
// set ourselves as the server
context.server = nullptr;
context.me.status = TaskType::SERVER;
return;
}
// look for a server among the peers
// search if a server is available among the peer.
const auto& server = std::ranges::find_if(
context.remotePeers,
[&](const auto& peer) { return peer.second->information.status == TaskType::SERVER; }
);
// if a server have been found, use it
if (server != context.remotePeers.end()) {
// if a server have been found, use it
context.server = server->second;
@ -55,27 +34,39 @@ void UndefinedTask::handle(Context& context) {
return;
}
// TODO(Faraphel): check if we have the lowest average ping out of all the peers. If yes, become the server.
// Others peers will connect to us on their next loop.
// TODO(Faraphel): use a unique ID instead ?
if (!context.me.serverEnabled)
return;
// check we have the lowest latency out of all the peers
if (std::all_of(
// wait that no more new peers are being discovered
if (
std::chrono::high_resolution_clock::now() - context.latestPeerDiscovery >
std::chrono::milliseconds(5000)
) {
// otherwise, become the server if we have the highest ID.
// TODO(Faraphel): should use the machine with the lowest average ping
if (context.me.serverEnabled) {
// find the remote peer with the highest id that can be a server
const auto serverCandidate = std::max_element(
context.remotePeers.begin(),
context.remotePeers.end(),
[&](const auto& peer) {
return peer.second->information.latencyAverage > context.me.latencyAverage;
[&](auto& remotePeer1, auto& remotePeer2) {
return (
(remotePeer1.second->information.serverEnabled ? remotePeer1.first : 0) <
(remotePeer2.second->information.serverEnabled ? remotePeer2.first : 0)
);
}
)) {
);
// check if we are this peer
if (context.me.id == serverCandidate->first) {
std::cout << "becoming server..." << std::endl;
// set ourselves as the server
context.server = nullptr;
context.server = serverCandidate->second;
context.me.status = TaskType::SERVER;
return;
}
}
// TODO(Faraphel): sleep 1s
}
// prepare a search message
packet::GenericPacket packet {};
packet::GenericPacketContent packetContent {};