M2-PT-DRP/source/managers/DriveManager.py

76 lines
2.7 KiB
Python

import hashlib
import json
import random
import tempfile
from pathlib import Path
import pyudev
from source.utils.system.mount import mount, unmount
class DriveManager:
"""
Manage the usb drive that can be connected to share informations
"""
def __init__(self, manager: "Manager"):
self.manager = manager
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.monitor.filter_by(subsystem='block', device_type='partition')
def handle(self) -> None:
device = self.monitor.poll()
# check if this is a new device
if device.action != 'add':
return
# check if the device is labelled "drp"
if device.properties["ID_FS_LABEL"] != "drp":
return
# mount the device inside a temporary directory
with tempfile.TemporaryDirectory() as directory:
directory = Path(directory)
try:
# mount the device
mount(device.device_node, directory, device.properties["ID_FS_TYPE"])
# look for the configuration file
configuration_path: Path = directory / "config.json"
# if it does not exist, create a default one
if not configuration_path.exists():
configuration_path.write_text(json.dumps({"channel": random.randint(1, 2**16),}))
# load the configuration
configuration: dict = json.loads(configuration_path.read_text())
# TODO(Faraphel): read the configuration channel to avoid conflicts
# look for the public key file
public_keys_path: Path = directory / "public_keys.json"
# if it does not exist, create a default one with our public key
if not public_keys_path.exists():
public_keys_path.write_text(json.dumps([]))
public_keys_hashes: list = json.loads(public_keys_path.read_text())
# get our public key hash
self_public_key_hash = hashlib.sha256(self.manager.communication.public_key).hexdigest()
# if we are missing of the list, add ourselves and save it
if self_public_key_hash not in public_keys_hashes:
public_keys_hashes.append(self_public_key_hash)
public_keys_path.write_text(json.dumps(public_keys_hashes))
# trust all the public keys in the file
for public_key_hash in public_keys_hashes:
self.manager.communication.trust_peer_hash(public_key_hash)
finally:
unmount(directory)
def loop(self) -> None:
while True:
self.handle()