M2-PT-DRP/source/behavior/tasks/server/ServerTask.cpp
study-faraphel a7c1bba666 reorganised some parts of the code
Utils moved to subdirectories
Task activation have been moved to static member of the Task class, instead of being manually activated.
2024-11-26 21:57:37 +01:00

142 lines
3.9 KiB
C++

#include "ServerTask.hpp"
#include <iostream>
#include <chrono>
#include <cstdint>
#include <cstring>
#include <mpg123.h>
#include <netdb.h>
#include <stdexcept>
#include <thread>
#include <utility>
#include <sys/socket.h>
#include "packets/audio/AudioPacketData.hpp"
#include "packets/base/Packet.hpp"
#include "packets/base/SecurityMode.hpp"
#include "utils/audio/audio.hpp"
namespace drp::task {
ServerTask::ServerTask() {
this->channels = 0;
this->encoding = 0;
this->sampleRate = 0;
// create a new mpg123 handle
int error;
this->mpgHandle = mpg123_new(nullptr, &error);
if (this->mpgHandle == nullptr)
throw std::runtime_error("[Task - Server] Could not create a mpg123 handle.");
// open the mp3 file
// TODO(Faraphel): mp3 file as argument
if (mpg123_open(
this->mpgHandle,
"./assets/Queen - Another One Bites the Dust.mp3"
) != MPG123_OK)
throw std::runtime_error("[Task - Server] Could not open file.");
// get the format of the file
if (mpg123_getformat(
this->mpgHandle,
&this->sampleRate,
&this->channels,
&this->encoding
) != MPG123_OK)
throw std::runtime_error("[Task - Server] Could not get the format of the file.");
}
void ServerTask::use(Context& context, const std::shared_ptr<RemotePeer>& server) {
context.me.status = TaskType::SERVER;
context.server = server;
}
ServerTask::~ServerTask() {
// delete the mpg123 handle
mpg123_close(this->mpgHandle);
mpg123_delete(this->mpgHandle);
}
void ServerTask::handle(Context& context) {
// TODO(Faraphel): create a chrony server
// get the time of the start of the processing
const auto startProcessingTime = std::chrono::high_resolution_clock::now();
// prepare the packet structure
packet::base::Packet packet {};
packet::base::PacketContent packetContent {};
std::size_t done;
// create a packet
packet::audio::AudioPacketData audioPacket;
packet.channel = 0;
// set the audio settings
audioPacket.channels = this->channels;
audioPacket.sampleFormat = util::encoding_mpg123_to_PulseAudio(this->encoding);
audioPacket.sampleRate = this->sampleRate;
std::vector<std::uint8_t> content(64992);
// read the file
if (mpg123_read(
this->mpgHandle,
content.data(),
content.size(),
&done
) != MPG123_OK) {
std::cerr << "[Task - Server] Could not read audio data from file." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return;
}
// resize the content to fit
content.resize(done);
audioPacket.content = content;
// set the target time
// TODO(Faraphel): dynamically change this delay to be the lowest possible
audioPacket.timePlay =
std::chrono::high_resolution_clock::now() +
std::chrono::milliseconds(5000);
packetContent.eventType = event::EventType::AUDIO;
packetContent.data = audioPacket.serialize();
packet.setContent(context, packet::base::SecurityMode::PLAIN, packetContent);
const auto serializedPacket = packet.serialize();
// broadcast the audio data
if (sendto(
context.socket,
serializedPacket.data(),
serializedPacket.size(),
0,
context.broadcastAddressInfo->ai_addr,
context.broadcastAddressInfo->ai_addrlen
) == -1) {
std::cerr << "[Task - Server] Could not send audio packet: " << strerror(errno) << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return;
}
std::cout << "[Task - Server] Sent: " << done << " bytes" << std::endl;
// wait for the duration of the audio chunk, since the start of the processing
std::this_thread::sleep_until(startProcessingTime + util::get_audio_chunk_duration(
this->sampleRate,
this->channels,
mpg123_encsize(this->encoding),
done
));
}
}