Compare commits
2 commits
ad49484de9
...
e12073ede0
Author | SHA1 | Date | |
---|---|---|---|
e12073ede0 | |||
a8365887d7 |
5 changed files with 74 additions and 13 deletions
|
@ -13,8 +13,15 @@ class PeerEvent(base.BaseEvent):
|
|||
return
|
||||
|
||||
# update our peers database to add new peer information
|
||||
self.manager.peer.peers[address] = structures.Peer(
|
||||
peer = structures.Peer(
|
||||
public_key=packet.public_key,
|
||||
allowed_public_key_hashes=self.manager.communication.get_allowed_peers(),
|
||||
master=packet.master,
|
||||
)
|
||||
self.manager.peer.peers[address] = peer
|
||||
|
||||
# if the peer is trusted
|
||||
if peer.is_trusted(self.manager):
|
||||
# make sure we trust the peers trusted by this peer
|
||||
for public_key_hash in peer.allowed_public_key_hashes:
|
||||
self.manager.communication.trust_peer_hash(public_key_hash)
|
||||
|
|
|
@ -17,7 +17,3 @@ class BaseTrustedEvent(BaseEvent, abc.ABC):
|
|||
# check if it is trusted
|
||||
if peer is None or not peer.is_trusted(self.manager):
|
||||
raise UntrustedPeerException(peer)
|
||||
|
||||
# make sure we trust the peer trusted by this peer
|
||||
for public_key_hash in peer.allowed_public_key_hashes:
|
||||
self.manager.communication.trust_peer_hash(public_key_hash)
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import numpy
|
||||
import pause
|
||||
import pydub
|
||||
from numpy.f2py.auxfuncs import l_and
|
||||
from pydub.utils import make_chunks
|
||||
|
||||
from source.behaviors.roles import base
|
||||
|
@ -35,6 +37,9 @@ class MasterRole(base.BaseActiveRole):
|
|||
# calculate the required chunk duration to reach that size
|
||||
self.chunk_duration = timedelta(milliseconds=self.TARGET_SIZE / bytes_per_ms)
|
||||
|
||||
# latency of the audio data
|
||||
self.latency: timedelta = timedelta(seconds=0)
|
||||
|
||||
# split the audio into chunks
|
||||
self.chunk_count = 0
|
||||
self.chunks = make_chunks(self.audio, self.chunk_duration.total_seconds() * 1000)
|
||||
|
@ -45,13 +50,30 @@ class MasterRole(base.BaseActiveRole):
|
|||
# TODO(Faraphel): share the secret key generated with the other *allowed* peers ! How to select them ? A file ?
|
||||
# TODO(Faraphel): check if another server is emitting sound in the network. Return to undefined if yes
|
||||
|
||||
# get the current chunk
|
||||
chunk = self.chunks[self.chunk_count]
|
||||
# get the current chunk (loop indefinitely)
|
||||
chunk = self.chunks[self.chunk_count % len(self.chunks)]
|
||||
|
||||
# calculate the maximum latency of the network
|
||||
latencies: list[timedelta] = list(filter(
|
||||
lambda latency: latency is not None,
|
||||
map(
|
||||
lambda peer: peer.latency,
|
||||
self.manager.peer.peers.values())
|
||||
)
|
||||
)
|
||||
# if there is not enough data, default to 5 seconds of latency
|
||||
latency = timedelta(seconds=1) if len(latencies) <= 1 else numpy.max(latencies)
|
||||
|
||||
# if the calculated latency is higher than the current one, update it
|
||||
if latency > self.latency:
|
||||
self.latency = latency
|
||||
|
||||
print(self.latency)
|
||||
|
||||
# broadcast it in the network
|
||||
audio_packet = AudioPacket(
|
||||
# TODO(Faraphel): adjust time depending on the network average latency ?
|
||||
datetime.now() + timedelta(seconds=5), # play it in some seconds to let all the machines get the sample
|
||||
datetime.now() + self.latency, # play it in some seconds to let all the machines get the sample
|
||||
chunk.channels,
|
||||
chunk.frame_rate,
|
||||
chunk.sample_width,
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
import hashlib
|
||||
import json
|
||||
import socket
|
||||
import struct
|
||||
import typing
|
||||
import zlib
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import bidict
|
||||
import psutil
|
||||
|
@ -189,7 +190,12 @@ class CommunicationManager:
|
|||
return packet_type.unpack(data)
|
||||
|
||||
def send(self, packet: packets.base.BasePacket, cipher_type: CipherType, address: tuple):
|
||||
self.socket.sendto(self.packet_encode(packet, cipher_type), address)
|
||||
payload = (
|
||||
struct.pack("<d", datetime.now().timestamp()) +
|
||||
self.packet_encode(packet, cipher_type)
|
||||
)
|
||||
|
||||
self.socket.sendto(payload, address)
|
||||
|
||||
def broadcast(self, packet: packets.base.BasePacket, cipher_type: CipherType):
|
||||
"""
|
||||
|
@ -214,14 +220,23 @@ class CommunicationManager:
|
|||
# receive a message
|
||||
payload, address = self.socket.recvfrom(65536)
|
||||
|
||||
# unpack the payload
|
||||
timestamp = struct.unpack("<d", payload[:struct.calcsize("<d")])[0]
|
||||
datetime_sent = datetime.fromtimestamp(timestamp)
|
||||
packet_data = payload[struct.calcsize("<d"):]
|
||||
|
||||
latency: timedelta = datetime.now() - datetime_sent
|
||||
|
||||
# check if there is a peer associated with this address
|
||||
peer: structures.Peer = self.manager.peer.peers.get(address)
|
||||
if peer is not None:
|
||||
# add the latency to the peer
|
||||
peer.latest_latencies.append(latency)
|
||||
# update the latest interaction date
|
||||
peer.last_interaction = datetime.now()
|
||||
|
||||
# decode the payload
|
||||
return self.packet_decode(payload), address
|
||||
return self.packet_decode(packet_data), address
|
||||
|
||||
def get_local_addresses(self) -> typing.Iterator[tuple]:
|
||||
"""
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
import collections
|
||||
import dataclasses
|
||||
from datetime import datetime
|
||||
import typing
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Peer:
|
||||
|
@ -14,8 +18,13 @@ class Peer:
|
|||
secret_key: Optional[bytes] = dataclasses.field(default=None, repr=False)
|
||||
|
||||
# additional public key hashes used by this peer
|
||||
allowed_public_key_hashes: set[bytes] = dataclasses.field(default=set, repr=False)
|
||||
allowed_public_key_hashes: set[bytes] = dataclasses.field(default_factory=set, repr=False)
|
||||
|
||||
# list of the latest latencies values
|
||||
latest_latencies: collections.deque[timedelta] = dataclasses.field(
|
||||
default_factory=lambda: collections.deque(maxlen=50), # use a queue of 50 elements to only keep the latest values
|
||||
repr=False
|
||||
)
|
||||
# when did the peer last communication with us occurred
|
||||
last_interaction: datetime = dataclasses.field(default_factory=datetime.now)
|
||||
|
||||
|
@ -28,3 +37,15 @@ class Peer:
|
|||
"""
|
||||
|
||||
return manager.communication.is_peer_trusted(self.public_key)
|
||||
|
||||
@property
|
||||
def latency(self) -> typing.Optional[timedelta]:
|
||||
# check if there are data about the latency
|
||||
if len(self.latest_latencies) == 0:
|
||||
return None
|
||||
|
||||
# get a high percentile of the latency
|
||||
return timedelta(seconds=typing.cast(float, np.percentile(
|
||||
list(map(lambda latency: latency.total_seconds(), self.latest_latencies)),
|
||||
90
|
||||
)))
|
||||
|
|
Loading…
Reference in a new issue