added the possibility to plug an USB drive to configure and add new public keys

This commit is contained in:
study-faraphel 2025-02-01 13:56:17 +01:00
parent b5c19561a1
commit a5e4830acb
15 changed files with 277 additions and 33 deletions

View file

@ -1,17 +1,20 @@
# extended standard
bidict
pause
sortedcontainers
numpy
bidict~=0.23.1
pause~=0.3
sortedcontainers~=2.4.0
numpy~=2.2.1
# networking
psutil
msgpack
psutil~=6.1.1
msgpack~=1.1.0
# cryptography
cryptography
cryptography~=44.0.0
# audio
pydub
pydub~=0.25.1
audioop-lts
sounddevice
sounddevice~=0.5.1
# system
pyudev~=0.24.3

View file

@ -12,12 +12,8 @@ class PeerEvent(base.BaseEvent):
if self.manager.communication.is_peer_banned(packet.public_key):
return
# TODO(Faraphel): SHOULD NOT BE TRUSTED AUTOMATICALLY !
self.manager.communication.trust_peer(packet.public_key)
# update our peers database to add new peer information
self.manager.peer.peers[address] = structures.Peer(
public_key=packet.public_key,
master=packet.master,
trusted=self.manager.communication.is_peer_trusted(packet.public_key)
master=packet.master
)

View file

@ -14,5 +14,5 @@ class BaseTrustedEvent(BaseEvent, abc.ABC):
# get the peer that sent the message
peer = self.manager.peer.peers.get(address)
# check if it is trusted
if peer is None or not peer.trusted:
if peer is None or not peer.is_trusted(self.manager):
raise UntrustedPeerException(peer)

View file

@ -1,8 +1,7 @@
import typing
from datetime import timedelta, datetime
from source import managers, packets, structures
from source.behaviors.roles import base, UndefinedRole
from source.behaviors.roles import base
from source.utils.crypto.type import CipherType
@ -31,4 +30,5 @@ class SlaveRole(base.BaseActiveRole):
# check if the master interacted recently
if datetime.now() - master_peer.last_interaction > timedelta(seconds=10):
# if the master didn't react in a moment, return to undefined mode
from source.behaviors.roles import UndefinedRole
self.manager.role.current = UndefinedRole(self.manager)

View file

@ -2,7 +2,6 @@ from datetime import datetime, timedelta
import pause
from source.behaviors import roles
from source.behaviors.roles import base
@ -32,7 +31,8 @@ class UndefinedRole(base.BaseRole):
# if no other peers have been found
if len(remote_peers) == 0:
# declare ourselves as the master of the network
self.manager.role.current = roles.MasterRole(self.manager)
from source.behaviors.roles import MasterRole
self.manager.role.current = MasterRole(self.manager)
return
# SCENARIO 2 - network with a master
@ -47,9 +47,10 @@ class UndefinedRole(base.BaseRole):
# if there is a master, become a slave
if len(master_peers) >= 1:
# get the first master information
master_address, master_peer = master_peers[0]
master_address, master_peer = next(iter(master_peers.items()))
# declare ourselves as a slave of the network
self.manager.role.current = roles.SlaveRole(self.manager, master_address)
from source.behaviors.roles import SlaveRole
self.manager.role.current = SlaveRole(self.manager, master_address)
return

View file

@ -1,5 +1,5 @@
from . import base
from .UndefinedRole import UndefinedRole
from .MasterRole import MasterRole
from .SlaveRole import SlaveRole
from .UndefinedRole import UndefinedRole
from .SlaveRole import SlaveRole

View file

@ -52,7 +52,7 @@ class CommunicationManager:
public_key_path.write_bytes(self.public_key)
self._trusted_peers_path = self.manager.storage / "trusted-peers.json"
self._trusted_peers: set[str] = set()
self._trusted_peers: set[str] = set(hashlib.sha256(self.public_key).hexdigest())
if self._trusted_peers_path.exists():
self._trusted_peers = set(json.loads(self._trusted_peers_path.read_text()))
@ -263,6 +263,17 @@ class CommunicationManager:
self._banned_peers_path.write_text(json.dumps(list(self._banned_peers)))
def trust_peer_hash(self, public_key_hash: str) -> None:
"""
Mark a peer hash as trusted for future connexion
Automatically save it to a file
:param public_key_hash: the public key hash of the peer
"""
self._trusted_peers.add(public_key_hash)
self.save_trusted_peers()
def trust_peer(self, public_key: bytes) -> None:
"""
Mark a peer as trusted for future connexion
@ -270,8 +281,17 @@ class CommunicationManager:
:param public_key: the public key of the peer
"""
self._trusted_peers.add(hashlib.sha256(public_key).hexdigest())
self.save_trusted_peers()
self.trust_peer_hash(hashlib.sha256(public_key).hexdigest())
def ban_peer_hash(self, public_key_hash: str) -> None:
"""
Ban a peer from being used for any future connexion
Automatically save it to a file
:param public_key_hash: the public key hash of the peer
"""
self._banned_peers.add(public_key_hash)
self.save_banned_peers()
def ban_peer(self, public_key: bytes) -> None:
"""
@ -280,8 +300,7 @@ class CommunicationManager:
:param public_key: the public key of the peer
"""
self._banned_peers.add(hashlib.sha256(public_key).hexdigest())
self.save_banned_peers()
self.ban_peer_hash(hashlib.sha256(public_key).hexdigest())
def is_peer_trusted(self, public_key: bytes) -> bool:
"""

View file

@ -0,0 +1,76 @@
import hashlib
import json
import random
import tempfile
from pathlib import Path
import pyudev
from source.utils.system.mount import mount, unmount
class DriveManager:
"""
Manage the usb drive that can be connected to share informations
"""
def __init__(self, manager: "Manager"):
self.manager = manager
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.monitor.filter_by(subsystem='block', device_type='partition')
def handle(self) -> None:
device = self.monitor.poll()
# check if this is a new device
if device.action != 'add':
return
# check if the device is labelled "drp"
if device.properties["ID_FS_LABEL"] != "drp":
return
# mount the device inside a temporary directory
with tempfile.TemporaryDirectory() as directory:
directory = Path(directory)
try:
# mount the device
mount(device.device_node, directory, device.properties["ID_FS_TYPE"])
# look for the configuration file
configuration_path: Path = directory / "config.json"
# if it does not exist, create a default one
if not configuration_path.exists():
configuration_path.write_text(json.dumps({"channel": random.randint(1, 2**16),}))
# load the configuration
configuration: dict = json.loads(configuration_path.read_text())
# TODO(Faraphel): read the configuration channel to avoid conflicts
# look for the public key file
public_keys_path: Path = directory / "public_keys.json"
# if it does not exist, create a default one with our public key
if not public_keys_path.exists():
public_keys_path.write_text(json.dumps([]))
public_keys_hashes: list = json.loads(public_keys_path.read_text())
# get our public key hash
self_public_key_hash = hashlib.sha256(self.manager.communication.public_key).hexdigest()
# if we are missing of the list, add ourselves and save it
if self_public_key_hash not in public_keys_hashes:
public_keys_hashes.append(self_public_key_hash)
public_keys_path.write_text(json.dumps(public_keys_hashes))
# trust all the public keys in the file
for public_key_hash in public_keys_hashes:
self.manager.communication.trust_peer_hash(public_key_hash)
finally:
unmount(directory)
def loop(self) -> None:
while True:
self.handle()

View file

@ -11,7 +11,7 @@ class Manager:
"""
def __init__(self, interface: str):
from . import CommunicationManager, EventManager, RoleManager, AudioManager, PeerManager
from . import CommunicationManager, EventManager, RoleManager, AudioManager, PeerManager, DriveManager
self.storage = Path("./storage/")
self.storage.mkdir(exist_ok=True)
@ -41,6 +41,9 @@ class Manager:
# peer manager
self.peer = PeerManager(self)
# drive manager
self.drive = DriveManager(self)
def loop(self) -> None:
"""
Handle the sub-managers forever
@ -51,13 +54,16 @@ class Manager:
role_thread = threading.Thread(target=self.role.loop)
audio_thread = threading.Thread(target=self.audio.loop)
peer_thread = threading.Thread(target=self.peer.loop)
drive_thread = threading.Thread(target=self.drive.loop)
event_thread.start()
role_thread.start()
audio_thread.start()
peer_thread.start()
drive_thread.start()
event_thread.join()
role_thread.join()
audio_thread.join()
peer_thread.join()
drive_thread.join()

View file

@ -3,5 +3,6 @@ from .EventManager import EventManager
from .RoleManager import RoleManager
from .AudioManager import AudioManager
from .PeerManager import PeerManager
from .DriveManager import DriveManager
from .Manager import Manager

View file

@ -13,8 +13,15 @@ class Peer:
# secret symmetric key
secret_key: Optional[bytes] = dataclasses.field(default=None, repr=False)
# is the machine trusted
trusted: bool = dataclasses.field(default=False)
# when did the peer last communication with us occurred
last_interaction: datetime = dataclasses.field(default_factory=datetime.now)
def is_trusted(self, manager) -> bool:
"""
Check if the peer is trusted
:param manager: the manager
:return: is the peer trusted
"""
return manager.communication.is_peer_trusted(self.public_key)

View file

@ -1,3 +1,4 @@
import hashlib
import os
from cryptography.hazmat.backends import default_backend
@ -96,4 +97,4 @@ def aes_cbc_decrypt(payload: bytes, key: bytes) -> bytes:
unpadder = padding.PKCS7(128).unpadder()
data = unpadder.update(decrypted_data) + unpadder.finalize()
return data
return data

View file

View file

@ -0,0 +1,28 @@
import ctypes
import ctypes.util
import os
from pathlib import Path
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
libc.umount2.argtypes = (ctypes.c_char_p, ctypes.c_ulong)
def mount(source: Path | str, target: Path | str, format_: str, flags: int = 0, data: str = ""):
result = libc.mount(
ctypes.c_char_p(str(source).encode('utf-8')),
ctypes.c_char_p(str(target).encode('utf-8')),
ctypes.c_char_p(str(format_).encode('utf-8')),
flags,
ctypes.c_char_p(data.encode('utf-8')) if data is not None else None
)
if result != 0:
raise OSError(f"Could not perform mount: {os.strerror(ctypes.get_errno())}")
def unmount(target: Path | str, flags: int = 0):
return libc.umount2(
ctypes.c_char_p(str(target).encode('utf-8')),
flags
)

106
test.py Normal file
View file

@ -0,0 +1,106 @@
import subprocess
class BatmanController:
# TODO: need to be able to get ping in the network if possible, otherwise no other uses
def __init__(self):
pass
class ChronyController:
"""
Wrapper to control chrony to manipulate time synchronisation
NOTE: the ideal method would be to use `chronyd.sock` to communicate, but it seems to only work with raw binary data
"""
def __init__(self):
self.clear_sources()
self.deny_client("all")
@staticmethod
def run_command(*args: str) -> str:
# run the command through the chrony command line tool
process = subprocess.run(
["chronyc", *args],
stdout=subprocess.PIPE,
)
# raise an error if the command did not ended up successfully
process.check_returncode()
# return the stdout of the command
return process.stdout.decode()
def get_sources(self) -> list[...]:
"""
Get the list of NTP sources
:return: the list of NTP sources
"""
...
def add_source(self, kind: str, source: ...) -> None:
"""
Add a source to chrony
"""
self.run_command("add", "server", host, "iburst")
def remove_source(self, source: ...) -> None:
"""
Remove a source from chrony
"""
...
def clear_sources(self) -> None:
"""
Clear the list of sources used by chrony
"""
for source in self.get_sources():
self.remove_source(source)
def get_clients(self) -> list[...]:
"""
Get the list of clients using us as a server
:return: the list of clients
"""
...
def allow_client(self, client: ...) -> None:
"""
Allow a client to use us as a server
:param client: the client information
"""
...
def allow_all_clients(self) -> None:
"""
Allow all clients to use us as a time-server
"""
def deny_client(self, client: ...) -> None:
"""
Deny a client to use us as a server
:param client: the client information
"""
...
def deny_all_clients(self) -> None:
"""
Deny all clients from using us as a time-server
"""
self.deny_client("all")
if __name__ == "__main__":
chrony = ChronyController()
while True:
command = input("chronyc >>> ")
print(chrony.run_command(command))