From e12073ede0a8d3d9880246ad259ff34c56c3679f Mon Sep 17 00:00:00 2001 From: study-faraphel Date: Sun, 2 Feb 2025 15:02:36 +0100 Subject: [PATCH] latency is now calculated based on the network latency --- source/behaviors/events/PeerEvent.py | 9 ++++++- .../behaviors/events/base/BaseTrustedEvent.py | 4 --- source/behaviors/roles/MasterRole.py | 24 +++++++++++++++++- source/managers/CommunicationManager.py | 21 +++++++++++++--- source/structures/Peer.py | 25 +++++++++++++++++-- 5 files changed, 72 insertions(+), 11 deletions(-) diff --git a/source/behaviors/events/PeerEvent.py b/source/behaviors/events/PeerEvent.py index 7ad335c..402f5ff 100644 --- a/source/behaviors/events/PeerEvent.py +++ b/source/behaviors/events/PeerEvent.py @@ -13,8 +13,15 @@ class PeerEvent(base.BaseEvent): return # update our peers database to add new peer information - self.manager.peer.peers[address] = structures.Peer( + peer = structures.Peer( public_key=packet.public_key, allowed_public_key_hashes=self.manager.communication.get_allowed_peers(), master=packet.master, ) + self.manager.peer.peers[address] = peer + + # if the peer is trusted + if peer.is_trusted(self.manager): + # make sure we trust the peers trusted by this peer + for public_key_hash in peer.allowed_public_key_hashes: + self.manager.communication.trust_peer_hash(public_key_hash) diff --git a/source/behaviors/events/base/BaseTrustedEvent.py b/source/behaviors/events/base/BaseTrustedEvent.py index 0c3ab21..1733976 100644 --- a/source/behaviors/events/base/BaseTrustedEvent.py +++ b/source/behaviors/events/base/BaseTrustedEvent.py @@ -17,7 +17,3 @@ class BaseTrustedEvent(BaseEvent, abc.ABC): # check if it is trusted if peer is None or not peer.is_trusted(self.manager): raise UntrustedPeerException(peer) - - # make sure we trust the peer trusted by this peer - for public_key_hash in peer.allowed_public_key_hashes: - self.manager.communication.trust_peer_hash(public_key_hash) diff --git a/source/behaviors/roles/MasterRole.py b/source/behaviors/roles/MasterRole.py index 6a9b481..36ab2d7 100644 --- a/source/behaviors/roles/MasterRole.py +++ b/source/behaviors/roles/MasterRole.py @@ -1,8 +1,10 @@ import os from datetime import datetime, timedelta +import numpy import pause import pydub +from numpy.f2py.auxfuncs import l_and from pydub.utils import make_chunks from source.behaviors.roles import base @@ -35,6 +37,9 @@ class MasterRole(base.BaseActiveRole): # calculate the required chunk duration to reach that size self.chunk_duration = timedelta(milliseconds=self.TARGET_SIZE / bytes_per_ms) + # latency of the audio data + self.latency: timedelta = timedelta(seconds=0) + # split the audio into chunks self.chunk_count = 0 self.chunks = make_chunks(self.audio, self.chunk_duration.total_seconds() * 1000) @@ -48,10 +53,27 @@ class MasterRole(base.BaseActiveRole): # get the current chunk (loop indefinitely) chunk = self.chunks[self.chunk_count % len(self.chunks)] + # calculate the maximum latency of the network + latencies: list[timedelta] = list(filter( + lambda latency: latency is not None, + map( + lambda peer: peer.latency, + self.manager.peer.peers.values()) + ) + ) + # if there is not enough data, default to 5 seconds of latency + latency = timedelta(seconds=1) if len(latencies) <= 1 else numpy.max(latencies) + + # if the calculated latency is higher than the current one, update it + if latency > self.latency: + self.latency = latency + + print(self.latency) + # broadcast it in the network audio_packet = AudioPacket( # TODO(Faraphel): adjust time depending on the network average latency ? - datetime.now() + timedelta(seconds=5), # play it in some seconds to let all the machines get the sample + datetime.now() + self.latency, # play it in some seconds to let all the machines get the sample chunk.channels, chunk.frame_rate, chunk.sample_width, diff --git a/source/managers/CommunicationManager.py b/source/managers/CommunicationManager.py index 10cf94e..8253c0d 100644 --- a/source/managers/CommunicationManager.py +++ b/source/managers/CommunicationManager.py @@ -1,9 +1,10 @@ import hashlib import json import socket +import struct import typing import zlib -from datetime import datetime +from datetime import datetime, timedelta import bidict import psutil @@ -189,7 +190,12 @@ class CommunicationManager: return packet_type.unpack(data) def send(self, packet: packets.base.BasePacket, cipher_type: CipherType, address: tuple): - self.socket.sendto(self.packet_encode(packet, cipher_type), address) + payload = ( + struct.pack(" typing.Iterator[tuple]: """ diff --git a/source/structures/Peer.py b/source/structures/Peer.py index 5de30c6..a580b0d 100644 --- a/source/structures/Peer.py +++ b/source/structures/Peer.py @@ -1,7 +1,11 @@ +import collections import dataclasses -from datetime import datetime +import typing +from datetime import datetime, timedelta from typing import Optional +import numpy as np + @dataclasses.dataclass class Peer: @@ -14,8 +18,13 @@ class Peer: secret_key: Optional[bytes] = dataclasses.field(default=None, repr=False) # additional public key hashes used by this peer - allowed_public_key_hashes: set[bytes] = dataclasses.field(default=set, repr=False) + allowed_public_key_hashes: set[bytes] = dataclasses.field(default_factory=set, repr=False) + # list of the latest latencies values + latest_latencies: collections.deque[timedelta] = dataclasses.field( + default_factory=lambda: collections.deque(maxlen=50), # use a queue of 50 elements to only keep the latest values + repr=False + ) # when did the peer last communication with us occurred last_interaction: datetime = dataclasses.field(default_factory=datetime.now) @@ -28,3 +37,15 @@ class Peer: """ return manager.communication.is_peer_trusted(self.public_key) + + @property + def latency(self) -> typing.Optional[timedelta]: + # check if there are data about the latency + if len(self.latest_latencies) == 0: + return None + + # get a high percentile of the latency + return timedelta(seconds=typing.cast(float, np.percentile( + list(map(lambda latency: latency.total_seconds(), self.latest_latencies)), + 90 + )))