#include "Manager.hpp" #include #include #include #include #include #include #include #include #include #include #include "behavior/events/types.hpp" #include "behavior/events/audio/AudioEvent.hpp" #include "behavior/events/info/InfoEvent.hpp" #include "behavior/events/pong/PongEvent.hpp" #include "behavior/events/search/SearchEvent.hpp" #include "packets/base/Packet.hpp" #include "behavior/tasks/client/ClientTask.hpp" #include "behavior/tasks/server/ServerTask.hpp" #include "behavior/tasks/undefined/UndefinedTask.hpp" Manager::Manager(const std::string& address, const std::string& port, const bool useIpv6) { std::cout << "Broadcast address: " << address << ":" << port << " (" << (useIpv6 ? "IPv6" : "IPv4") << ")" << std::endl; // register the different events type this->eventRegistry = { {drp::event::EventType::PONG, std::make_shared()}, {drp::event::EventType::SEARCH, std::make_shared()}, {drp::event::EventType::INFO, std::make_shared()}, {drp::event::EventType::AUDIO, std::make_shared()}, }; // register the different tasks type this->taskRegistry = { {drp::task::TaskType::UNDEFINED, std::make_shared()}, {drp::task::TaskType::CLIENT, std::make_shared()}, {drp::task::TaskType::SERVER, std::make_shared()}, }; // hints for the communication addrinfo broadcastAddressHints {}; broadcastAddressHints.ai_family = useIpv6 ? AF_INET6 : AF_INET; broadcastAddressHints.ai_socktype = SOCK_DGRAM; broadcastAddressHints.ai_protocol = IPPROTO_UDP; // create the client socket this->context.socket = socket( broadcastAddressHints.ai_family, broadcastAddressHints.ai_socktype, broadcastAddressHints.ai_protocol ); if (this->context.socket < 0) throw std::runtime_error("Could not create the socket: " + std::string(strerror(errno))); // allow IPv6 multicast loopback so that we can receive our own messages. int socketLoopback = 1; if (setsockopt( context.socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &socketLoopback, sizeof(socketLoopback) ) < 0) { std::cerr << "Failed to set IPV6_MULTICAST_LOOP: " << strerror(errno) << std::endl; } // get the information for the broadcast local-link address if(const int error = getaddrinfo( address.c_str(), port.c_str(), &broadcastAddressHints, &context.broadcastAddressInfo ) != 0) throw std::runtime_error("Could not get the broadcast address: " + std::string(gai_strerror(error))); // hints for the bind address addrinfo anyAddressHints {}; anyAddressHints.ai_family = useIpv6 ? AF_INET6 : AF_INET; anyAddressHints.ai_flags = AI_PASSIVE; anyAddressHints.ai_socktype = SOCK_DGRAM; anyAddressHints.ai_protocol = IPPROTO_UDP; // get the information for the broadcast local-link address addrinfo *anyAddressInfo; if(const int error = getaddrinfo( nullptr, port.c_str(), &anyAddressHints, &anyAddressInfo ) != 0) throw std::runtime_error("Could not get the any address: " + std::string(gai_strerror(error))); // bind the socket to the address if (bind( this->context.socket, anyAddressInfo->ai_addr, anyAddressInfo->ai_addrlen ) < 0) throw std::runtime_error("Could not bind to the address: " + std::string(strerror(errno))); // generate a random identifier for ourselves std::random_device randomDevice; std::mt19937 randomGenerator(randomDevice()); std::uniform_int_distribution distribution( 1, std::numeric_limits::max() ); this->context.me.id = distribution(randomGenerator); // TODO(Faraphel): should only be enabled in specific case. this->context.me.serverEnabled = true; // define the time of the latest discovery this->context.latestPeerDiscovery = std::chrono::high_resolution_clock::now(); } void Manager::loop() { // run an event receiver and sender this->senderThread = std::thread(&Manager::loopSender, this); this->receiverThread = std::thread(&Manager::loopReceiver, this); this->senderThread.join(); this->receiverThread.join(); freeaddrinfo(this->context.broadcastAddressInfo); } void Manager::loopSender() { while (true) { std::cout << "[Sender] Handling status: " + std::to_string(static_cast(this->context.me.status)) << std::endl; // get the corresponding task class std::shared_ptr task; try { task = this->taskRegistry.at(this->context.me.status); } catch (const std::out_of_range& exception) { std::cerr << "[Sender] Unsupported status." << std::endl; continue; } // ask the task class to handle the task task->handle(this->context); } } void Manager::loopReceiver() { // prepare space for the sender address sockaddr_storage fromAddress {}; socklen_t fromAddressLength = sizeof(fromAddress); drp::packet::base::Packet packet {}; drp::packet::base::PacketContent packetContent {}; // client loop while (true) { // receive new data const ssize_t size = recvfrom( this->context.socket, &packet, sizeof(packet), 0, reinterpret_cast(&fromAddress), &fromAddressLength ); if (size == -1) throw std::runtime_error("[Receiver] Could not receive the packet: " + std::string(strerror(errno))); // if the packet channel is neither 0 (all) nor the current one, ignore it if (packet.channel != 0 && packet.channel != this->context.me.channel) continue; // decrypt the packet packetContent = packet.getContent(); // get the corresponding event class std::shared_ptr event; try { event = this->eventRegistry.at(static_cast(packetContent.eventType)); } catch (const std::out_of_range& exception) { std::cerr << "[Receiver] Unsupported event type." << std::endl; continue; } std::cout << "[Receiver] handling event: " << std::to_string(packetContent.eventType) << std::endl; // ask the event class to handle the event event->handle( this->context, packetContent, fromAddress, fromAddressLength ); } }