M2-PT-DRP/source/managers/DriveManager.py
study-faraphel 9480339b89 improved error handling
instead of having a component crashing, it will issue a warning with the error
2025-02-02 23:31:06 +01:00

82 lines
No EOL
2.9 KiB
Python

import hashlib
import json
import random
import tempfile
import traceback
import warnings
from pathlib import Path
import pyudev
from source.utils.system.mount import mount, unmount
class DriveManager:
"""
Manage the usb drive that can be connected to share informations
"""
def __init__(self, manager: "Manager"):
self.manager = manager
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.monitor.filter_by(subsystem='block', device_type='partition')
def handle(self) -> None:
device = self.monitor.poll()
# check if this is a new device
if device.action != 'add':
return
# check if the device is labelled "drp"
if device.properties["ID_FS_LABEL"] != "drp":
return
# mount the device inside a temporary directory
with tempfile.TemporaryDirectory() as directory:
directory = Path(directory)
try:
# mount the device
mount(device.device_node, directory, device.properties["ID_FS_TYPE"])
# look for the configuration file
configuration_path: Path = directory / "config.json"
# if it does not exist, create a default one
if not configuration_path.exists():
configuration_path.write_text(json.dumps({"channel": random.randint(1, 2**16),}))
# load the configuration
configuration: dict = json.loads(configuration_path.read_text())
# TODO(Faraphel): read the configuration channel to avoid conflicts
# look for the public key file
public_keys_path: Path = directory / "public_keys.json"
# if it does not exist, create a default one with our public key
if not public_keys_path.exists():
public_keys_path.write_text(json.dumps([]))
public_keys_hashes: list = json.loads(public_keys_path.read_text())
# get our public key hash
self_public_key_hash = hashlib.sha256(self.manager.communication.public_key).hexdigest()
# if we are missing of the list, add ourselves and save it
if self_public_key_hash not in public_keys_hashes:
public_keys_hashes.append(self_public_key_hash)
public_keys_path.write_text(json.dumps(public_keys_hashes))
# trust all the public keys in the file
for public_key_hash in public_keys_hashes:
self.manager.communication.trust_peer_hash(public_key_hash)
finally:
unmount(directory)
def loop(self) -> None:
while True:
try:
self.handle()
except Exception: # NOQA
warnings.warn(traceback.format_exc())