Compare commits

...

2 commits

5 changed files with 74 additions and 13 deletions

View file

@ -13,8 +13,15 @@ class PeerEvent(base.BaseEvent):
return
# update our peers database to add new peer information
self.manager.peer.peers[address] = structures.Peer(
peer = structures.Peer(
public_key=packet.public_key,
allowed_public_key_hashes=self.manager.communication.get_allowed_peers(),
master=packet.master,
)
self.manager.peer.peers[address] = peer
# if the peer is trusted
if peer.is_trusted(self.manager):
# make sure we trust the peers trusted by this peer
for public_key_hash in peer.allowed_public_key_hashes:
self.manager.communication.trust_peer_hash(public_key_hash)

View file

@ -17,7 +17,3 @@ class BaseTrustedEvent(BaseEvent, abc.ABC):
# check if it is trusted
if peer is None or not peer.is_trusted(self.manager):
raise UntrustedPeerException(peer)
# make sure we trust the peer trusted by this peer
for public_key_hash in peer.allowed_public_key_hashes:
self.manager.communication.trust_peer_hash(public_key_hash)

View file

@ -1,8 +1,10 @@
import os
from datetime import datetime, timedelta
import numpy
import pause
import pydub
from numpy.f2py.auxfuncs import l_and
from pydub.utils import make_chunks
from source.behaviors.roles import base
@ -35,6 +37,9 @@ class MasterRole(base.BaseActiveRole):
# calculate the required chunk duration to reach that size
self.chunk_duration = timedelta(milliseconds=self.TARGET_SIZE / bytes_per_ms)
# latency of the audio data
self.latency: timedelta = timedelta(seconds=0)
# split the audio into chunks
self.chunk_count = 0
self.chunks = make_chunks(self.audio, self.chunk_duration.total_seconds() * 1000)
@ -45,13 +50,30 @@ class MasterRole(base.BaseActiveRole):
# TODO(Faraphel): share the secret key generated with the other *allowed* peers ! How to select them ? A file ?
# TODO(Faraphel): check if another server is emitting sound in the network. Return to undefined if yes
# get the current chunk
chunk = self.chunks[self.chunk_count]
# get the current chunk (loop indefinitely)
chunk = self.chunks[self.chunk_count % len(self.chunks)]
# calculate the maximum latency of the network
latencies: list[timedelta] = list(filter(
lambda latency: latency is not None,
map(
lambda peer: peer.latency,
self.manager.peer.peers.values())
)
)
# if there is not enough data, default to 5 seconds of latency
latency = timedelta(seconds=1) if len(latencies) <= 1 else numpy.max(latencies)
# if the calculated latency is higher than the current one, update it
if latency > self.latency:
self.latency = latency
print(self.latency)
# broadcast it in the network
audio_packet = AudioPacket(
# TODO(Faraphel): adjust time depending on the network average latency ?
datetime.now() + timedelta(seconds=5), # play it in some seconds to let all the machines get the sample
datetime.now() + self.latency, # play it in some seconds to let all the machines get the sample
chunk.channels,
chunk.frame_rate,
chunk.sample_width,

View file

@ -1,9 +1,10 @@
import hashlib
import json
import socket
import struct
import typing
import zlib
from datetime import datetime
from datetime import datetime, timedelta
import bidict
import psutil
@ -189,7 +190,12 @@ class CommunicationManager:
return packet_type.unpack(data)
def send(self, packet: packets.base.BasePacket, cipher_type: CipherType, address: tuple):
self.socket.sendto(self.packet_encode(packet, cipher_type), address)
payload = (
struct.pack("<d", datetime.now().timestamp()) +
self.packet_encode(packet, cipher_type)
)
self.socket.sendto(payload, address)
def broadcast(self, packet: packets.base.BasePacket, cipher_type: CipherType):
"""
@ -214,14 +220,23 @@ class CommunicationManager:
# receive a message
payload, address = self.socket.recvfrom(65536)
# unpack the payload
timestamp = struct.unpack("<d", payload[:struct.calcsize("<d")])[0]
datetime_sent = datetime.fromtimestamp(timestamp)
packet_data = payload[struct.calcsize("<d"):]
latency: timedelta = datetime.now() - datetime_sent
# check if there is a peer associated with this address
peer: structures.Peer = self.manager.peer.peers.get(address)
if peer is not None:
# add the latency to the peer
peer.latest_latencies.append(latency)
# update the latest interaction date
peer.last_interaction = datetime.now()
# decode the payload
return self.packet_decode(payload), address
return self.packet_decode(packet_data), address
def get_local_addresses(self) -> typing.Iterator[tuple]:
"""

View file

@ -1,7 +1,11 @@
import collections
import dataclasses
from datetime import datetime
import typing
from datetime import datetime, timedelta
from typing import Optional
import numpy as np
@dataclasses.dataclass
class Peer:
@ -14,8 +18,13 @@ class Peer:
secret_key: Optional[bytes] = dataclasses.field(default=None, repr=False)
# additional public key hashes used by this peer
allowed_public_key_hashes: set[bytes] = dataclasses.field(default=set, repr=False)
allowed_public_key_hashes: set[bytes] = dataclasses.field(default_factory=set, repr=False)
# list of the latest latencies values
latest_latencies: collections.deque[timedelta] = dataclasses.field(
default_factory=lambda: collections.deque(maxlen=50), # use a queue of 50 elements to only keep the latest values
repr=False
)
# when did the peer last communication with us occurred
last_interaction: datetime = dataclasses.field(default_factory=datetime.now)
@ -28,3 +37,15 @@ class Peer:
"""
return manager.communication.is_peer_trusted(self.public_key)
@property
def latency(self) -> typing.Optional[timedelta]:
# check if there are data about the latency
if len(self.latest_latencies) == 0:
return None
# get a high percentile of the latency
return timedelta(seconds=typing.cast(float, np.percentile(
list(map(lambda latency: latency.total_seconds(), self.latest_latencies)),
90
)))