Merge pull request 'Fully implement the vote' (#8) from base into main
Reviewed-on: #8
This commit is contained in:
commit
9404c4e964
41 changed files with 1001 additions and 6 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1,3 +1,5 @@
|
|||
# IDE
|
||||
.idea/
|
||||
!.idea/runConfigurations/
|
||||
|
||||
subject.pdf
|
||||
|
|
0
cli/__init__.py
Normal file
0
cli/__init__.py
Normal file
31
cli/__main__.py
Normal file
31
cli/__main__.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
import argparse
|
||||
|
||||
from cli import role
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="role")
|
||||
|
||||
role.admin.load_parse(subparsers)
|
||||
role.client.load_parse(subparsers)
|
||||
role.machine.load_parse(subparsers)
|
||||
|
||||
|
||||
arguments = parser.parse_args()
|
||||
|
||||
|
||||
if not arguments:
|
||||
parser.print_help()
|
||||
exit(1)
|
||||
|
||||
|
||||
match arguments.role:
|
||||
case "admin":
|
||||
role.admin.run(parser, arguments)
|
||||
case "machine":
|
||||
role.machine.run(parser, arguments)
|
||||
case "client":
|
||||
role.client.run(parser, arguments)
|
||||
|
||||
# if the role is unknown, show the usage
|
||||
case _:
|
||||
parser.print_usage()
|
3
cli/role/__init__.py
Normal file
3
cli/role/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from . import admin
|
||||
from . import client
|
||||
from . import machine
|
2
cli/role/admin/__init__.py
Normal file
2
cli/role/admin/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .argparse import load_parse
|
||||
from .run import run
|
2
cli/role/admin/action/__init__.py
Normal file
2
cli/role/admin/action/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from . import create
|
||||
from . import init
|
2
cli/role/admin/action/create/__init__.py
Normal file
2
cli/role/admin/action/create/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .argparse import load_parse
|
||||
from .run import run
|
11
cli/role/admin/action/create/argparse.py
Normal file
11
cli/role/admin/action/create/argparse.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
import argparse
|
||||
|
||||
from . import create_role
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("create")
|
||||
|
||||
subsubparsers = subparser.add_subparsers(dest="create_role")
|
||||
create_role.client.load_parse(subsubparsers)
|
||||
create_role.machine.load_parse(subsubparsers)
|
2
cli/role/admin/action/create/create_role/__init__.py
Normal file
2
cli/role/admin/action/create/create_role/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from . import machine
|
||||
from . import client
|
|
@ -0,0 +1,2 @@
|
|||
from .argparse import load_parse
|
||||
from .run import run
|
|
@ -0,0 +1,8 @@
|
|||
import argparse
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("client")
|
||||
|
||||
subparser.add_argument("-u", "--username", dest="username", type=str, required=True)
|
||||
subparser.add_argument("-p", "--password", dest="password", type=str, required=True)
|
39
cli/role/admin/action/create/create_role/client/run.py
Normal file
39
cli/role/admin/action/create/create_role/client/run.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import bcrypt
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
# TODO(Faraphel): should NOT be named "client"
|
||||
print("creating new client...")
|
||||
|
||||
directory_client = Path(f"./assets/client/{arguments.username}/")
|
||||
if directory_client.exists():
|
||||
raise ValueError("This client already exists !")
|
||||
|
||||
directory_client.mkdir(parents=True)
|
||||
|
||||
# Generate a private key
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
backend=default_backend(),
|
||||
)
|
||||
|
||||
(directory_client / "private.key").write_bytes(
|
||||
private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.OpenSSH,
|
||||
encryption_algorithm=serialization.BestAvailableEncryption(arguments.password.encode()),
|
||||
)
|
||||
)
|
||||
(directory_client / "public.key").write_bytes(
|
||||
private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.OpenSSH,
|
||||
)
|
||||
)
|
|
@ -0,0 +1,2 @@
|
|||
from .argparse import load_parse
|
||||
from .run import run
|
|
@ -0,0 +1,7 @@
|
|||
import argparse
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("machine")
|
||||
|
||||
subparser.add_argument("-n", "--name", dest="name", type=str, required=True)
|
67
cli/role/admin/action/create/create_role/machine/run.py
Normal file
67
cli/role/admin/action/create/create_role/machine/run.py
Normal file
|
@ -0,0 +1,67 @@
|
|||
import argparse
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
print("creating a new machine...")
|
||||
|
||||
directory_admin = Path("./assets/admin/")
|
||||
if not directory_admin.exists():
|
||||
raise ValueError("Can't find the admin. Have you initialized it ?")
|
||||
|
||||
directory_machine = Path(f"./assets/machine/{arguments.name}/")
|
||||
if directory_machine.exists():
|
||||
raise ValueError("This machine already exists !")
|
||||
|
||||
directory_machine.mkdir(parents=True)
|
||||
|
||||
# Generate a private key
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
)
|
||||
|
||||
(directory_machine / "private.key").write_bytes(
|
||||
private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
)
|
||||
|
||||
certificate_admin = x509.load_pem_x509_certificate(
|
||||
(directory_admin / "certificate.pem").read_bytes()
|
||||
)
|
||||
private_key_admin = serialization.load_pem_private_key(
|
||||
(directory_admin / "private.key").read_bytes(),
|
||||
password=None
|
||||
)
|
||||
|
||||
# Generate a self-signed root certificate
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "FR"),
|
||||
x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "Province"),
|
||||
x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Locality"),
|
||||
x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Organization"),
|
||||
x509.NameAttribute(x509.NameOID.COMMON_NAME, "domain.com"),
|
||||
])
|
||||
certificate = (
|
||||
x509.CertificateBuilder()
|
||||
.subject_name(subject)
|
||||
.issuer_name(certificate_admin.subject)
|
||||
.public_key(private_key.public_key())
|
||||
.serial_number(x509.random_serial_number())
|
||||
.not_valid_before(datetime.now() - timedelta(days=1)) # TODO(Faraphel): settings
|
||||
.not_valid_after(datetime.now() + timedelta(days=365)) # TODO(Faraphel): settings
|
||||
.sign(private_key_admin, hashes.SHA256())
|
||||
)
|
||||
|
||||
# Save the root certificate to a file
|
||||
(directory_machine / "certificate.pem").write_bytes(certificate.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
print("machine created !")
|
15
cli/role/admin/action/create/run.py
Normal file
15
cli/role/admin/action/create/run.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
import argparse
|
||||
|
||||
from . import create_role
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
match arguments.create_role:
|
||||
case "client":
|
||||
create_role.client.run(parser, arguments)
|
||||
case "machine":
|
||||
create_role.machine.run(parser, arguments)
|
||||
|
||||
# if the subcommand is not known, show the help
|
||||
case _:
|
||||
parser.print_usage()
|
2
cli/role/admin/action/init/__init__.py
Normal file
2
cli/role/admin/action/init/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .argparse import load_parse
|
||||
from .run import run
|
5
cli/role/admin/action/init/argparse.py
Normal file
5
cli/role/admin/action/init/argparse.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
import argparse
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("init")
|
53
cli/role/admin/action/init/run.py
Normal file
53
cli/role/admin/action/init/run.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
import argparse
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
print("initializing the admin...")
|
||||
|
||||
directory = Path(f"./assets/admin/")
|
||||
if directory.exists():
|
||||
raise ValueError("The admin already exists !")
|
||||
|
||||
directory.mkdir(parents=True)
|
||||
|
||||
# Generate a private key
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=4096,
|
||||
)
|
||||
|
||||
(directory / "private.key").write_bytes(
|
||||
private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a self-signed root certificate
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "FR"),
|
||||
x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "Province"),
|
||||
x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Locality"),
|
||||
x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Organization"),
|
||||
x509.NameAttribute(x509.NameOID.COMMON_NAME, "domain.com"),
|
||||
])
|
||||
certificate = (
|
||||
x509.CertificateBuilder()
|
||||
.subject_name(subject)
|
||||
.issuer_name(subject)
|
||||
.public_key(private_key.public_key())
|
||||
.serial_number(x509.random_serial_number())
|
||||
.not_valid_before(datetime.now() - timedelta(days=1)) # TODO(Faraphel): settings
|
||||
.not_valid_after(datetime.now() + timedelta(days=365)) # TODO(Faraphel): settings
|
||||
.sign(private_key, hashes.SHA256())
|
||||
)
|
||||
|
||||
# Save the root certificate to a file
|
||||
(directory / "certificate.pem").write_bytes(certificate.public_bytes(serialization.Encoding.PEM))
|
11
cli/role/admin/argparse.py
Normal file
11
cli/role/admin/argparse.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
import argparse
|
||||
|
||||
from . import action
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("admin")
|
||||
|
||||
subsubparsers = subparser.add_subparsers(dest="action")
|
||||
action.create.load_parse(subsubparsers)
|
||||
action.init.load_parse(subsubparsers)
|
15
cli/role/admin/run.py
Normal file
15
cli/role/admin/run.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
import argparse
|
||||
|
||||
from . import action
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
match arguments.action:
|
||||
case "create":
|
||||
action.create.run(parser, arguments)
|
||||
case "init":
|
||||
action.init.run(parser, arguments)
|
||||
|
||||
# if the subcommand is not known, show the help
|
||||
case _:
|
||||
parser.print_usage()
|
2
cli/role/client/__init__.py
Normal file
2
cli/role/client/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .argparse import load_parse
|
||||
from .run import run
|
2
cli/role/client/action/__init__.py
Normal file
2
cli/role/client/action/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from . import vote
|
||||
from .argparse import load_parse
|
9
cli/role/client/action/argparse.py
Normal file
9
cli/role/client/action/argparse.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
import argparse
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("vote")
|
||||
|
||||
subparser.add_argument("-u", "--username", dest="username", type=str, required=True)
|
||||
subparser.add_argument("-p", "--password", dest="password", type=str, required=True)
|
||||
subparser.add_argument("-v", "--vote", dest="vote", type=str, required=True)
|
23
cli/role/client/action/vote.py
Normal file
23
cli/role/client/action/vote.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
import argparse
|
||||
import json
|
||||
import ssl
|
||||
import socket
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
|
||||
context.load_verify_locations(arguments.certificate)
|
||||
context.check_hostname = False
|
||||
|
||||
with (
|
||||
socket.create_connection((arguments.hostname, arguments.port)) as sock_client,
|
||||
context.wrap_socket(sock_client, server_hostname=arguments.hostname) as ssl_sock_client
|
||||
):
|
||||
data = {
|
||||
"username": arguments.username,
|
||||
"password": arguments.password,
|
||||
"vote": arguments.vote,
|
||||
}
|
||||
|
||||
ssl_sock_client.send(json.dumps(data).encode())
|
||||
print("message sent.")
|
15
cli/role/client/argparse.py
Normal file
15
cli/role/client/argparse.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from . import action
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("client")
|
||||
|
||||
subparser.add_argument("-H", "--host", dest="hostname", type=str, default="127.0.0.1")
|
||||
subparser.add_argument("-P", "--port", dest="port", type=int, default=57823)
|
||||
subparser.add_argument("-cc", "--certificate", dest="certificate", type=Path, required=False)
|
||||
|
||||
subsubparsers = subparser.add_subparsers(dest="action")
|
||||
action.load_parse(subsubparsers)
|
13
cli/role/client/run.py
Normal file
13
cli/role/client/run.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
import argparse
|
||||
|
||||
from . import action
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
match arguments.action:
|
||||
case "vote":
|
||||
action.vote.run(parser, arguments)
|
||||
|
||||
# if the subcommand is not known, show the help
|
||||
case _:
|
||||
parser.print_usage()
|
2
cli/role/machine/__init__.py
Normal file
2
cli/role/machine/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .argparse import load_parse
|
||||
from .run import run
|
11
cli/role/machine/argparse.py
Normal file
11
cli/role/machine/argparse.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def load_parse(subparsers):
|
||||
subparser: argparse.ArgumentParser = subparsers.add_parser("machine")
|
||||
|
||||
subparser.add_argument("-H", "--host", dest="hostname", type=str, default="0.0.0.0")
|
||||
subparser.add_argument("-p", "--port", dest="port", type=int, default=57823)
|
||||
subparser.add_argument("-cc", "--certificate", dest="certificate", type=Path, required=True)
|
||||
subparser.add_argument("-ck", "--private-key", dest="private_key", type=Path, required=True)
|
23
cli/role/machine/run.py
Normal file
23
cli/role/machine/run.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
import argparse
|
||||
import json
|
||||
|
||||
|
||||
def run(parser: argparse.ArgumentParser, arguments: argparse.Namespace):
|
||||
import ssl
|
||||
import socket
|
||||
|
||||
context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(arguments.certificate, arguments.private_key)
|
||||
|
||||
with (
|
||||
socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock_server,
|
||||
context.wrap_socket(sock_server, server_side=True) as ssl_sock_server
|
||||
):
|
||||
ssl_sock_server.bind((arguments.hostname, arguments.port))
|
||||
ssl_sock_server.listen(5)
|
||||
|
||||
while True:
|
||||
print("waiting for a connection...")
|
||||
connect, address = ssl_sock_server.accept()
|
||||
data = json.loads(connect.recv())
|
||||
print(data)
|
|
@ -1 +1 @@
|
|||
cryptography
|
||||
cryptography
|
48
source/Card.py
Normal file
48
source/Card.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
import hashlib
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
|
||||
|
||||
class Card:
|
||||
"""
|
||||
Represent the card of an elector.
|
||||
"""
|
||||
|
||||
def __init__(self, pin: str):
|
||||
self._private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
)
|
||||
self.public_key: RSAPublicKey = self._private_key.public_key()
|
||||
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(bytes(pin, 'utf-8'))
|
||||
self.hashed_pin = digest.finalize()
|
||||
# self.activee = False
|
||||
|
||||
def check_pin(self, hashed_pin: bytes) -> bool:
|
||||
"""
|
||||
Check if the card is valid by comparing hashed PIN.
|
||||
:param hashed_pin: The PIN to check.
|
||||
:return: True if the received hash PIN matches the stored hashed PIN, False otherwise.
|
||||
"""
|
||||
return self.hashed_pin == hashed_pin
|
||||
|
||||
# NOTE(Faraphel): a PKI or a banlist / whitelist shall be checked with self.public_key
|
||||
|
||||
def reply_challenge(self, data: bytes = "encrypted data") -> bytes:
|
||||
"""
|
||||
:param data: The challeng sent by the terminal to sign.
|
||||
:return: The signed challenge.
|
||||
"""
|
||||
signature = self._private_key.sign(
|
||||
data,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()),
|
||||
salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256()
|
||||
)
|
||||
return signature
|
56
source/Certificate.py
Normal file
56
source/Certificate.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
from datetime import datetime
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
|
||||
class Certificate:
|
||||
"""
|
||||
A certificate.
|
||||
Wrapper around cryptography.x509.Certificate.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
issuer: str,
|
||||
issuer_private_key: bytes = None,
|
||||
subject: str,
|
||||
valid_start: datetime,
|
||||
valid_end: datetime
|
||||
):
|
||||
# generate a private key for the certificate
|
||||
self._private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||
# get the public key from it
|
||||
self.public_key = self._private_key.public_key()
|
||||
|
||||
# if the issuer private key is not specified, consider it a self-signed certificate
|
||||
if issuer_private_key is None:
|
||||
issuer_private_key = self._private_key
|
||||
|
||||
# create a builder for the certificate
|
||||
builder = x509.CertificateBuilder(
|
||||
issuer_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, issuer)]),
|
||||
subject_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject)]),
|
||||
serial_number=x509.random_serial_number(),
|
||||
public_key=self.public_key,
|
||||
not_valid_before=valid_start,
|
||||
not_valid_after=valid_end,
|
||||
)
|
||||
|
||||
# create the certificate by signing it
|
||||
self._certificate = builder.sign(issuer_private_key, algorithm=hashes.SHA256())
|
||||
|
||||
def is_valid(self, date: datetime, issuer_public_key: bytes) -> bool:
|
||||
return (
|
||||
# check if the date is valid
|
||||
self._certificate.not_valid_before <= date <= self._certificate.not_valid_after and
|
||||
# check the signature of the certificate
|
||||
self._certificate.verify_directly_issued_by()
|
||||
)
|
||||
|
||||
|
||||
|
||||
# Exemple d'utilisation
|
||||
if __name__ == "__main__":
|
||||
certificate = Certificate("Example Subject")
|
297
source/Machine.py
Normal file
297
source/Machine.py
Normal file
|
@ -0,0 +1,297 @@
|
|||
import datetime
|
||||
import os
|
||||
import random
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey, RSAPrivateKey
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
|
||||
from .Card import Card
|
||||
from .models.Elector import Elector
|
||||
from .models.Proof import Proof
|
||||
|
||||
|
||||
class Machine:
|
||||
"""
|
||||
Represent a machine used to vote
|
||||
"""
|
||||
|
||||
def __init__(self, emerging_list: list[Elector]):
|
||||
# First model: list of people authorized to vote on this machine.
|
||||
# Public (que name et procuration pas empreinte) ?
|
||||
|
||||
self._private_key: RSAPrivateKey = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
)
|
||||
self.public_key: RSAPublicKey = self._private_key.public_key()
|
||||
|
||||
# List of electors who can vote on this machine.
|
||||
self.emerging_list: list[Elector] = emerging_list
|
||||
|
||||
# List of people who voted.
|
||||
self.proof_list: list[Proof] = []
|
||||
|
||||
# Private votes (shuffle when a vote is inserted).
|
||||
# ["A", "B", "C"]
|
||||
# NOTE(Faraphel): rename candidates ?
|
||||
self._vote_list: list[str] = []
|
||||
|
||||
self.end_of_election: bool = False
|
||||
|
||||
def search_elector(self, pubkey: RSAPublicKey) -> Optional[Elector]:
|
||||
"""
|
||||
Search public key of elector in the emerging list.
|
||||
:param pubkey: Public key of elector to search in the emerging list.
|
||||
:return: The first elector who matches or None.
|
||||
"""
|
||||
try:
|
||||
return next(filter(
|
||||
lambda elector: elector.public_key_elector == pubkey,
|
||||
self.emerging_list
|
||||
))
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
def search_proxy_vote(self, pubkey: RSAPublicKey) -> Optional[Elector]:
|
||||
"""
|
||||
Search the elector with the public key registered as the proxy vote.
|
||||
:param pubkey: Public key of the mandataire who can proxy vote.
|
||||
:return: The elector with the proxy vote pubkey registered.
|
||||
"""
|
||||
try:
|
||||
return next(filter(
|
||||
lambda elector: elector.public_key_mandataire == pubkey,
|
||||
self.emerging_list
|
||||
))
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
def search_proof(self, pubkey: RSAPublicKey) -> Optional[Proof]:
|
||||
"""
|
||||
Search pubkey in the signature list
|
||||
:param pubkey: Public to search in the signature list.
|
||||
:return: The first elector with pubkey which voted or None.
|
||||
"""
|
||||
try:
|
||||
return next(filter(
|
||||
lambda proof: proof.public_key_elector == pubkey,
|
||||
self.proof_list
|
||||
))
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
def check_fingerprint(self, elector: Elector, fingerprint: str) -> bool:
|
||||
"""
|
||||
Hash the fingerprint and check if it authenticates the elector.
|
||||
:param elector: Elector to authenticate.
|
||||
:param fingerprint: Fingerprint to check.
|
||||
:return: True if the fingerprint authenticates the mandataire or the voter else False.
|
||||
"""
|
||||
# Is the fingerprint matches?
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(bytes(fingerprint, 'utf-8'))
|
||||
hashed_fingerprint = digest.finalize()
|
||||
|
||||
if (hashed_fingerprint == elector.hashed_fingerprint_mandataire or
|
||||
hashed_fingerprint == elector.hashed_fingerprint_elector):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def check_card(self, card: Card, pin: str) -> bool:
|
||||
"""
|
||||
Check if the card is valid.
|
||||
:param card:
|
||||
:param pin:
|
||||
:return:
|
||||
"""
|
||||
# Public key transmitted by the card.
|
||||
public_key = card.public_key
|
||||
|
||||
# Challenge to verify public key.
|
||||
challenge = os.urandom(128)
|
||||
signature = card.reply_challenge(challenge)
|
||||
|
||||
try:
|
||||
public_key.verify(
|
||||
signature,
|
||||
challenge,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()),
|
||||
salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256()
|
||||
)
|
||||
except InvalidSignature:
|
||||
print("Carte invalide : échec du challenge.")
|
||||
return False
|
||||
|
||||
# Pin verification.
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(bytes(pin, 'utf-8'))
|
||||
hashed_pin = digest.finalize()
|
||||
if not card.check_pin(hashed_pin):
|
||||
print("Mot de passe de la carte refusé.")
|
||||
return False
|
||||
|
||||
# Card has been verified.
|
||||
return True
|
||||
|
||||
def authenticate(self, card: Card, pin: str, fingerprint: str) -> bool:
|
||||
"""
|
||||
Authenticate a person with its card and its fingerprint.
|
||||
:param card: Elector card.
|
||||
:param pin: Elector card pin.
|
||||
:param fingerprint: Elector fingerprint associated with its card public key.
|
||||
:return: True if the elector is registered in the emerging list, else False.
|
||||
"""
|
||||
# Check if election is not ended.
|
||||
if self.end_of_election:
|
||||
print("L’élection est terminée.")
|
||||
return False
|
||||
|
||||
# Check if card is valid.
|
||||
if not self.check_card(card, pin):
|
||||
print("Carte de l’électeur invalide.")
|
||||
return False
|
||||
|
||||
# Is in emerging list?
|
||||
elector = self.search_elector(card.public_key)
|
||||
if elector is None:
|
||||
elector = self.search_proxy_vote(card.public_key)
|
||||
if elector is None:
|
||||
print("L’électeur n’est pas inscrit dans la liste électorale.")
|
||||
return False
|
||||
print(f"L’électeur {elector.name} est sur la liste électorale et vote par procuration.")
|
||||
else:
|
||||
print(f"L’électeur {elector.name} est sur la liste électorale.")
|
||||
|
||||
# Is the fingerprint matching?
|
||||
if not self.check_fingerprint(elector, fingerprint):
|
||||
print(f"Erreur de reconnaissance de l’empreinte digitale de {elector.name}.")
|
||||
return False
|
||||
|
||||
# L’électeur est authentifié.
|
||||
print(f"L’électeur {elector.name} est authentifié")
|
||||
return True
|
||||
|
||||
def vote(self, card: Card, vote: str) -> bool:
|
||||
"""
|
||||
Vote with the card (user must have been authenticated previously).
|
||||
:param card: Elector card
|
||||
:param vote: Elector vote or proxy vote
|
||||
:return: True if the vote has been inserted else False.
|
||||
"""
|
||||
# Check if election is not ended.
|
||||
if self.end_of_election:
|
||||
print("L’élection est terminée.")
|
||||
return False
|
||||
|
||||
# Check if elector can vote and has not vot.
|
||||
|
||||
# Vérification si l’électeur peut voter pour lui.
|
||||
elector = self.search_elector(card.public_key)
|
||||
if elector is None:
|
||||
|
||||
# Vérification si l’électeur est un mandataire.
|
||||
elector = self.search_proxy_vote(card.public_key)
|
||||
if elector is None:
|
||||
# Ne devrait pas arriver si l’utilisateur est authentifiée.
|
||||
print("L’électeur n’est pas inscrit dans la liste électorale.")
|
||||
return False
|
||||
|
||||
else:
|
||||
# Vérification si le mandataire a déjà voté pour l’électeur.
|
||||
proof = self.search_proof(elector.public_key_elector)
|
||||
if proof is not None:
|
||||
print(f"L’électeur {elector.name} a déjà voté.")
|
||||
return False
|
||||
|
||||
# C’est ok
|
||||
print(f"L’électeur {elector.name} vote par procuration.")
|
||||
|
||||
else:
|
||||
# Vérification si l’électeur a déjà voté pour lui.
|
||||
proof = self.search_proof(elector.public_key_elector)
|
||||
if proof is not None:
|
||||
print(f"L’électeur {elector.name} a déjà voté.")
|
||||
return False
|
||||
|
||||
# C’est ok
|
||||
print(f"L’électeur {elector.name} peut voter.")
|
||||
|
||||
# Create proof of voting: date + pubkey elector + pubkey mandataire.
|
||||
date = datetime.datetime.now()
|
||||
pem_votant: bytes = elector.public_key_elector.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.PKCS1,
|
||||
)
|
||||
pem_mandataire: bytes = elector.public_key_mandataire.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.PKCS1,
|
||||
)
|
||||
concatenation: bytes = bytes(str(date), 'utf-8') + pem_votant + pem_mandataire
|
||||
|
||||
# Create proof signature.
|
||||
proof_signature = self._private_key.sign(
|
||||
concatenation,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()),
|
||||
salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256()
|
||||
)
|
||||
proof = Proof(date,
|
||||
elector.public_key_elector,
|
||||
elector.public_key_mandataire,
|
||||
proof_signature)
|
||||
print(f"Preuve de vote de {elector.name} ajoutée.")
|
||||
|
||||
# Append proof and vote to the databases.
|
||||
self.proof_list.append(proof)
|
||||
self._vote_list.append(vote)
|
||||
random.shuffle(self._vote_list)
|
||||
|
||||
print(f"Vote de {elector.name} comptabilisé(e)!")
|
||||
return True
|
||||
|
||||
def cloture_du_vote(self) -> tuple[list[str], bytes]:
|
||||
"""
|
||||
End of the election, publishes the voting results.
|
||||
:return: The list of votes and its signature by the voting machine.
|
||||
"""
|
||||
# Election is closed
|
||||
self.end_of_election = True
|
||||
|
||||
data = bytes("".join(self._vote_list), 'utf-8')
|
||||
vote_list_signature = self._private_key.sign(
|
||||
data,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()),
|
||||
salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256()
|
||||
)
|
||||
return self._vote_list, vote_list_signature
|
||||
|
||||
def print_signing_list(self) -> None:
|
||||
...
|
||||
|
||||
def print_emerging_list(self) -> None:
|
||||
...
|
||||
|
||||
# def to_bytes(public_key_votant, public):
|
||||
# pem_votant: bytes = self.public_key_votant.public_bytes(
|
||||
# encoding=serialization.Encoding.PEM,
|
||||
# format=serialization.PublicFormat.PKCS1,
|
||||
# )
|
||||
# pem_mandataire: bytes = self.public_key_mandataire.public_bytes(
|
||||
# encoding=serialization.Encoding.PEM,
|
||||
# format=serialization.PublicFormat.PKCS1,
|
||||
# )
|
||||
# return bytes(str(self.date), 'utf-8') + pem_votant + pem_mandataire
|
|
@ -4,23 +4,54 @@ from cryptography import x509
|
|||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
|
||||
# ETAT
|
||||
|
||||
|
||||
# generate a private key for the certificate
|
||||
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||
admin_private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096)
|
||||
# get the public key from it
|
||||
public_key = private_key.public_key()
|
||||
admin_public_key = admin_private_key.public_key()
|
||||
|
||||
# create a builder for the certificate
|
||||
builder = x509.CertificateBuilder(
|
||||
issuer_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "vote.gouv.fr")]),
|
||||
subject_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "vote.gouv.fr")]),
|
||||
serial_number=x509.random_serial_number(),
|
||||
public_key=public_key,
|
||||
public_key=admin_public_key,
|
||||
not_valid_before=datetime.now(),
|
||||
not_valid_after=datetime.now() + timedelta(weeks=1),
|
||||
)
|
||||
|
||||
# create the certificate by signing it
|
||||
certificate = builder.sign(private_key, algorithm=hashes.SHA256())
|
||||
admin_certificate = builder.sign(admin_private_key, algorithm=hashes.SHA256())
|
||||
|
||||
|
||||
print(certificate)
|
||||
print(admin_certificate)
|
||||
|
||||
|
||||
# BUREAU
|
||||
|
||||
|
||||
# generate a private key for the certificate
|
||||
machine_private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||
# get the public key from it
|
||||
machine_public_key = machine_private_key.public_key()
|
||||
|
||||
# create a builder for the certificate
|
||||
builder = x509.CertificateBuilder(
|
||||
issuer_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "vote.gouv.fr")]),
|
||||
subject_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "machine.vote.gouv.fr")]),
|
||||
serial_number=x509.random_serial_number(),
|
||||
public_key=machine_public_key,
|
||||
not_valid_before=datetime.now(),
|
||||
not_valid_after=datetime.now() + timedelta(weeks=1),
|
||||
)
|
||||
|
||||
# create the certificate by signing it
|
||||
machine_certificate = builder.sign(admin_private_key, algorithm=hashes.SHA256())
|
||||
|
||||
|
||||
print(machine_certificate)
|
||||
# check that the machine
|
||||
machine_certificate.verify_directly_issued_by(admin_certificate)
|
||||
|
|
47
source/models/Elector.py
Normal file
47
source/models/Elector.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
|
||||
|
||||
class Elector:
|
||||
# [
|
||||
# {
|
||||
# "name": "Bob",
|
||||
# "votant": (pub_key_bob, Hash(empreinte_bob)),
|
||||
# "mandataire": (b"", b"")
|
||||
# },
|
||||
# {
|
||||
# "name": "Alice",
|
||||
# "votant": (pub_key_alice, Hash(empreinte_alice)),
|
||||
# "mandataire": (pub_key_Eve, Hash(empreinte_eve)), # Eve peut voter pour Alice.
|
||||
# }
|
||||
# ]
|
||||
|
||||
def __init__(self,
|
||||
name: str,
|
||||
public_key_elector: RSAPublicKey,
|
||||
fingerprint_elector: str,
|
||||
public_key_mandataire: Optional[RSAPublicKey] = None,
|
||||
fingerprint_mandataire: Optional[str] = None):
|
||||
self.name = name
|
||||
self.public_key_elector = public_key_elector
|
||||
|
||||
if public_key_mandataire is None:
|
||||
self.public_key_mandataire = public_key_elector
|
||||
else:
|
||||
self.public_key_mandataire = public_key_mandataire
|
||||
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(bytes(fingerprint_elector, 'utf-8'))
|
||||
self.hashed_fingerprint_elector: bytes = digest.finalize()
|
||||
|
||||
if fingerprint_mandataire is None:
|
||||
self.hashed_fingerprint_mandataire = self.hashed_fingerprint_elector
|
||||
else:
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(bytes(fingerprint_mandataire, 'utf-8'))
|
||||
self.hashed_fingerprint_mandataire: bytes = digest.finalize()
|
||||
|
||||
def set_mandataire(self, public_key: bytes, hashed_fingerprint: bytes):
|
||||
self.public_key_mandataire = public_key
|
||||
self.hashed_fingerprint_mandataire = hashed_fingerprint
|
24
source/models/Proof.py
Normal file
24
source/models/Proof.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class Proof:
|
||||
|
||||
date: datetime
|
||||
public_key_elector: RSAPublicKey
|
||||
public_key_mandataire: RSAPublicKey
|
||||
proof_signature: bytes
|
||||
|
||||
# def to_bytes(public_key_votant, public):
|
||||
# pem_votant: bytes = self.public_key_votant.public_bytes(
|
||||
# encoding=serialization.Encoding.PEM,
|
||||
# format=serialization.PublicFormat.PKCS1,
|
||||
# )
|
||||
# pem_mandataire: bytes = self.public_key_mandataire.public_bytes(
|
||||
# encoding=serialization.Encoding.PEM,
|
||||
# format=serialization.PublicFormat.PKCS1,
|
||||
# )
|
||||
# return bytes(str(self.date), 'utf-8') + pem_votant + pem_mandataire
|
0
source/models/__init__.py
Normal file
0
source/models/__init__.py
Normal file
82
tests/exemple1.py
Normal file
82
tests/exemple1.py
Normal file
|
@ -0,0 +1,82 @@
|
|||
from source.Card import Card
|
||||
from source.models.Elector import Elector
|
||||
from source.Machine import Machine
|
||||
|
||||
# Vote électronique
|
||||
|
||||
# 3 personnes
|
||||
|
||||
alice = {
|
||||
"name": "Alice",
|
||||
"password": "6060",
|
||||
"empreinte": "empreinteA",
|
||||
"vote": "Riri",
|
||||
}
|
||||
|
||||
bob = {
|
||||
"name": "Bob",
|
||||
"password": "0100",
|
||||
"empreinte": "empreinteB",
|
||||
"vote": "Toto",
|
||||
}
|
||||
|
||||
eve = {
|
||||
"name": "Eve",
|
||||
"password": "008",
|
||||
"empreinte": "empreinteE",
|
||||
"vote": "Jack",
|
||||
}
|
||||
|
||||
personnes = [alice, bob, eve]
|
||||
|
||||
# Création des cartes
|
||||
|
||||
alice["card"] = Card(alice["password"])
|
||||
bob["card"] = Card(bob["password"])
|
||||
eve["card"] = Card(eve["password"])
|
||||
print(f"Liste de cartes : {[personne['card'] for personne in personnes]}")
|
||||
print("Cartes d’élections créées.")
|
||||
print()
|
||||
|
||||
# Création de la liste électorale
|
||||
|
||||
# Eve peut voter pour Alice
|
||||
alice_elector = Elector(name=alice["name"],
|
||||
public_key_elector=alice["card"].public_key, fingerprint_elector=alice["empreinte"],
|
||||
public_key_mandataire=eve["card"].public_key, fingerprint_mandataire=eve["empreinte"])
|
||||
bob_elector = Elector(name=bob["name"],
|
||||
public_key_elector=bob["card"].public_key, fingerprint_elector=bob["empreinte"])
|
||||
|
||||
emerging_list = [alice_elector, bob_elector]
|
||||
print(f"Liste d’émargement: {emerging_list}")
|
||||
print("Liste électorale créée.")
|
||||
print()
|
||||
|
||||
# Création de la machine
|
||||
|
||||
machine = Machine(emerging_list)
|
||||
print(f"Machine pubkey : {machine.public_key}")
|
||||
print("Machine pour voter créée")
|
||||
print()
|
||||
|
||||
# Votes des personnes
|
||||
alice["password"] = "fleur"
|
||||
|
||||
for personne in personnes:
|
||||
# Authentification
|
||||
print()
|
||||
print(f"La personne {personne["name"]} s’authentifie avec le mdp {personne["password"]} et l’empreinte {personne["empreinte"]}.")
|
||||
if not machine.authenticate(personne["card"], personne["password"], personne["empreinte"]):
|
||||
continue
|
||||
|
||||
# Vote
|
||||
print()
|
||||
print(f"La personne {personne["name"]} va voter pour {personne["vote"]}")
|
||||
if not machine.vote(personne["card"], personne["vote"]):
|
||||
continue
|
||||
|
||||
# Publication des résultats
|
||||
|
||||
votes, sig_votes = machine.cloture_du_vote()
|
||||
print()
|
||||
print(votes)
|
2
tests/exemple2.py
Normal file
2
tests/exemple2.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
|
||||
# Vote à distance
|
27
tests/schema.md
Normal file
27
tests/schema.md
Normal file
|
@ -0,0 +1,27 @@
|
|||
# Initialisation
|
||||
|
||||
Il à été distribué dans chaque bureau de votes une machine qui servira d'urne
|
||||
|
||||
L'Administrateur à son propre certificat admin
|
||||
L'administrateur crée un certificat pour chaque machine dans les bureaux de votes
|
||||
|
||||
Tout le monde récupère une carte leur permettant de voter dans leur bureau de vote respectif avec
|
||||
un code pin et leurs empreinte digitale
|
||||
|
||||
# Voter
|
||||
|
||||
L'électeur ou son représentant se présente au bureau de vote avec la carte
|
||||
|
||||
Il s'authentifie auprès de la machine
|
||||
|
||||
Il choisi son candidat
|
||||
La machine enregistre son choix et le fait qu'il à voter de manière indépendante.
|
||||
|
||||
# Publication
|
||||
|
||||
A la fin du vote, les votes sont publiés et visible par tous.
|
||||
|
||||
# Difference avec le vote electronique
|
||||
|
||||
On etabli la connexion TLS au debut avant de realiser l'authentification
|
||||
|
Loading…
Reference in a new issue