import hashlib import json import random import tempfile import traceback import warnings from pathlib import Path import pyudev from source.utils.system.mount import mount, unmount class DriveManager: """ Manage the usb drive that can be connected to share informations """ def __init__(self, manager: "Manager"): self.manager = manager self.context = pyudev.Context() self.monitor = pyudev.Monitor.from_netlink(self.context) self.monitor.filter_by(subsystem='block', device_type='partition') def handle(self) -> None: device = self.monitor.poll() # check if this is a new device if device.action != 'add': return # check if the device is labelled "drp" if device.properties["ID_FS_LABEL"] != "drp": return # mount the device inside a temporary directory with tempfile.TemporaryDirectory() as directory: directory = Path(directory) try: # mount the device mount(device.device_node, directory, device.properties["ID_FS_TYPE"]) # look for the configuration file configuration_path: Path = directory / "config.json" # if it does not exist, create a default one if not configuration_path.exists(): configuration_path.write_text(json.dumps({"channel": random.randint(1, 2**16),})) # load the configuration configuration: dict = json.loads(configuration_path.read_text()) # TODO(Faraphel): read the configuration channel to avoid conflicts # look for the public key file public_keys_path: Path = directory / "public_keys.json" # if it does not exist, create a default one with our public key if not public_keys_path.exists(): public_keys_path.write_text(json.dumps([])) public_keys_hashes: list = json.loads(public_keys_path.read_text()) # get our public key hash self_public_key_hash = hashlib.sha256(self.manager.communication.public_key).hexdigest() # if we are missing of the list, add ourselves and save it if self_public_key_hash not in public_keys_hashes: public_keys_hashes.append(self_public_key_hash) public_keys_path.write_text(json.dumps(public_keys_hashes)) # trust all the public keys in the file for public_key_hash in public_keys_hashes: self.manager.communication.trust_peer_hash(public_key_hash) finally: unmount(directory) def loop(self) -> None: while True: try: self.handle() except Exception: # NOQA warnings.warn(traceback.format_exc())