diff --git a/src/packages/authorities/__init__.py b/src/packages/authorities/__init__.py new file mode 100644 index 0000000..78d4c61 --- /dev/null +++ b/src/packages/authorities/__init__.py @@ -0,0 +1,4 @@ +from .elPapa import ElPapa +from .pedroSanchez import PedroSanchez +from .ursula import Ursula +from .certificate import Certificate diff --git a/src/packages/authorities/certificate.py b/src/packages/authorities/certificate.py new file mode 100644 index 0000000..b5b4de2 --- /dev/null +++ b/src/packages/authorities/certificate.py @@ -0,0 +1,26 @@ +from cryptography import x509 + +class Certificate: + def __init__(self, certificate: x509.Certificate, issuer_certificate = None) -> None: + + self.__issuer_certificate = issuer_certificate + + if issuer_certificate is None: + self.__issuer_certificate = certificate + + self.__certificate = certificate + + + @property + def issuer_certificate(self) -> x509.Certificate: + return self.__issuer_certificate + + @property + def certificate(self) -> x509.Certificate: + return self.__certificate + + def __str__(self) -> str: + return f"\nCertificate: {str(self.__certificate)} - Issuer: {str(self.issuer_certificate)}" + + def __repr__(self) -> str: + return self.__str__() \ No newline at end of file diff --git a/src/packages/authorities/elPapa.py b/src/packages/authorities/elPapa.py new file mode 100644 index 0000000..0433750 --- /dev/null +++ b/src/packages/authorities/elPapa.py @@ -0,0 +1,80 @@ +import datetime +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from .certificate import Certificate +from .singleton import singleton + +@singleton +class ElPapa: + def __init__(self): + self.__private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + self.__subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "VA"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Vaticano"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "Roma"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Cristianismo"), + x509.NameAttribute(NameOID.COMMON_NAME, "Dios.com"), + ]) + + x509certificate = x509.CertificateBuilder().subject_name( + self.__subject + ).issuer_name( + self.__subject + ).public_key( + self.__private_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.now(datetime.timezone.utc) + ).not_valid_after( + # Our certificate will be valid for 10 days + datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=10) + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName("localhost")]), + critical=False, + # Sign our certificate with our private key + ).sign(self.__private_key, hashes.SHA256()) + + self.__certificate = Certificate(x509certificate) + + @property + def certificate(self): + return self.__certificate + + @property + def trusted_certs(self): + return [self.__certificate] + + def info(self): + return self.__subject + + + def issueCertificate(self, csr) -> x509.Certificate: + x509certificate = x509.CertificateBuilder().subject_name( + csr.subject + ).issuer_name( + self.__subject + ).public_key( + csr.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.now(datetime.timezone.utc) + ).not_valid_after( + # Our certificate will be valid for 10 days + datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=10) + ).sign(self.__private_key, hashes.SHA256()) + + certificate = Certificate(x509certificate, self.__certificate) + + return certificate + +if __name__ == '__main__': + a = ElPapa() + print(a.certificate) \ No newline at end of file diff --git a/src/packages/authorities/pedroSanchez.py b/src/packages/authorities/pedroSanchez.py new file mode 100644 index 0000000..bef66de --- /dev/null +++ b/src/packages/authorities/pedroSanchez.py @@ -0,0 +1,64 @@ +import datetime +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from .elPapa import ElPapa +from .certificate import Certificate +from .singleton import singleton + +@singleton +class PedroSanchez: + def __init__(self) -> None: + self.__private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + self.__subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Madrid"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "Madrid"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "PSOE"), + x509.NameAttribute(NameOID.COMMON_NAME, "Presidente de españa"), + ]) + + csr = x509.CertificateSigningRequestBuilder().subject_name( + self.__subject + ).sign(self.__private_key, hashes.SHA256()) + + elpapa = ElPapa() + + self.__certificate = elpapa.issueCertificate(csr) + + self.__trusted_certs = [self.__certificate] + elpapa.trusted_certs + + @property + def trusted_certs(self): + return self.__trusted_certs + + @property + def certificate(self): + return self.__certificate + + def issueCertificate(self, csr) -> x509.Certificate: + certificate = x509.CertificateBuilder().subject_name( + csr.subject + ).issuer_name( + self.__subject + ).public_key( + csr.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.now(datetime.timezone.utc) + ).not_valid_after( + # Our certificate will be valid for 10 days + datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=10) + ).sign(self.__private_key, hashes.SHA256()) + + + return Certificate(certificate, self.__certificate) + + + + \ No newline at end of file diff --git a/src/packages/authorities/singleton.py b/src/packages/authorities/singleton.py new file mode 100644 index 0000000..17597ac --- /dev/null +++ b/src/packages/authorities/singleton.py @@ -0,0 +1,7 @@ +def singleton(class_): + instances = {} + def getinstance(*args, **kwargs): + if class_ not in instances: + instances[class_] = class_(*args, **kwargs) + return instances[class_] + return getinstance diff --git a/src/packages/authorities/ursula.py b/src/packages/authorities/ursula.py new file mode 100644 index 0000000..a16aa6e --- /dev/null +++ b/src/packages/authorities/ursula.py @@ -0,0 +1,64 @@ +import datetime +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from .elPapa import ElPapa + +from .certificate import Certificate +from .singleton import singleton + +@singleton +class Ursula: + def __init__(self) -> None: + self.__private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + self.__subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "DE"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Berlin"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "Berlin"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Mercedes"), + x509.NameAttribute(NameOID.COMMON_NAME, "doishnewneitz.com"), + ]) + + csr = x509.CertificateSigningRequestBuilder().subject_name( + self.__subject + ).sign(self.__private_key, hashes.SHA256()) + + elpapa = ElPapa() + + self.__certificate = elpapa.issueCertificate(csr) + self.__trusted_certs = [self.__certificate] + elpapa.trusted_certs + + @property + def trusted_certs(self): + return self.__trusted_certs + + @property + def certificate(self): + return self.__certificate + + def issueCertificate(self, csr): + certificate = x509.CertificateBuilder().subject_name( + csr.subject + ).issuer_name( + self.__subject + ).public_key( + csr.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.now(datetime.timezone.utc) + ).not_valid_after( + # Our certificate will be valid for 10 days + datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=10) + ).sign(self.__private_key, hashes.SHA256()) + + return Certificate(certificate, self.__certificate) + + + + \ No newline at end of file diff --git a/src/packages/client.py b/src/packages/client.py index a735162..a5cf43d 100644 --- a/src/packages/client.py +++ b/src/packages/client.py @@ -1,19 +1,86 @@ from packages.server import Server, ImgPackage from packages.imgproc import * from PIL import Image +import datetime +from cryptography import x509 +from cryptography.x509.oid import NameOID from packages.imgproc.img_cripto_utils import ImageCryptoUtils from cryptography.hazmat.primitives.asymmetric import rsa, padding -from cryptography.hazmat.primitives import hashes, serialization + +from cryptography.hazmat.primitives import hashes +from packages.authorities import PedroSanchez, Certificate +import logging + + class Client: def __init__(self): + # logging + self.logger = logging.getLogger('Client') + self.logger.setLevel(logging.DEBUG) + file_handler = logging.FileHandler('SYSTEM.log') + file_handler.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + self.username = None self.password = None + self.__plain_pass = None self.encryptor = None - self._server = Server() - self.__private_key = None - self._public_key = None + + self.__server = Server() + self.__private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + self.__subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "ES"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Madrid"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "Colmenarejo"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "UC3M"), + x509.NameAttribute(NameOID.COMMON_NAME, "uc3m.com"), + ]) + # para probar certificado auto firmado borrar desde aqui hasta el comentario + csr = x509.CertificateSigningRequestBuilder().subject_name( + self.__subject + ).sign(self.__private_key, hashes.SHA256()) + pedroSanchez = PedroSanchez() + self.__certificate = pedroSanchez.issueCertificate(csr) + + self.__trusted_certs = [self.__certificate] + pedroSanchez.trusted_certs + + """ # test certificado auto firmado + self.__certificate = x509.CertificateBuilder().subject_name( + self.__subject + ).issuer_name( + self.__subject + ).public_key( + self.__private_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.now(datetime.timezone.utc) + ).not_valid_after( + # Our certificate will be valid for 10 days + datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=10) + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName("localhost")]), + critical=False, + # Sign our certificate with our private key + ).sign(self.__private_key, hashes.SHA256()) + + self.__certificate = Certificate(self.__certificate) + + self.__trusted_certs = [self.__certificate] + """ + # FIXME remove this + @property + def server(self): + return self.__server + + def get_images(self, num: int | None = -1, username: str | None = None, date: str | None = None, time: str | None = None) -> list: @@ -55,7 +122,7 @@ def get_images(self, num: int | None = -1, username: str | None = None, decrypted_images = [] progress = 0 for im in images: - decrypted = ImageCryptoUtils.decrypt(im.image, self.password) + decrypted = ImageCryptoUtils.decrypt(im.image, self.__plain_pass) new = ImgPackage(im.author, im.date, im.time, im.path, decrypted) decrypted_images.append(new) yield round((progress / len(images)) * 100, 2), new @@ -70,7 +137,29 @@ def register(self, name: str, password: str) -> None: password (str): password of the user """ - return self._server.create_user(name, password) + self.logger.info(" Registering user...") + self.logger.info(" Checking servers certificate...") + if not self.__check_servers_certificate(self.__server.certificate): + raise Exception("Servers certificate not trusted") + self.logger.info(" Servers certificate is trusted") + self.logger.info(" Obtaining servers public key...") + + servers_pk = self.__server.certificate.certificate.public_key() + # encrypt password with public key + + self.logger.info(" Encrypting password...") + password = password.encode() + password = servers_pk.encrypt( + password, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ).hex() + self.logger.info(" Password encrypted and sended to server") + return self.__server.create_user(name, password) + def logout(self): """ @@ -89,17 +178,38 @@ def login(self, name: str, password: str): bool: True if the user was logged in, False otherwise """ - if self._server.login(name, password): + self.logger.info(" Logging in...") + self.logger.info(" Checking servers certificate...") + if not self.__check_servers_certificate(self.__server.certificate): + raise Exception("Servers certificate not trusted") + self.logger.info(" Servers certificate is trusted") + # encrypt password with public key + self.logger.info(" Encrypting password...") + servers_pk = self.__server.certificate.certificate.public_key() + # encrypt password with public key + + self.logger.info(" Encrypting password...") + plain_pass = password + password = password.encode() + password = servers_pk.encrypt( + password, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ).hex() + + self.logger.info(" Password encrypted and sended to server") + if self.__server.login(name, password): + self.__plain_pass = plain_pass self.username = name self.password = password - self.__private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=4096, - ) - self._public_key = self.__private_key.public_key() + self.logger.info(" Logged in") else: + self.logger.info(" User or password incorrect") raise ValueError("User or password incorrect") def remove_user(self) -> None: @@ -116,6 +226,7 @@ def upload_photo(self, path: str, x: int = 0, y: int = 0, w: int = 200, h: int = w (int, optional): width of the square to encrypt. Defaults to 200. h (int, optional): height of the square to encrypt. Defaults to 200. """ + self.logger.info(" Uploading image...") # check if image is png if not path.endswith(".png"): raise Exception("Image must be a PNG") @@ -124,28 +235,25 @@ def upload_photo(self, path: str, x: int = 0, y: int = 0, w: int = 200, h: int = image = Image.open(path) except: raise Exception("Image could not be opened check path and format") - # encrypt image - # generate users AES key - + self.logger.info(" Image valid...") + self.logger.info(" Encrypting image...") + self.logger.info(" Checking servers certificate...") + if not self.__check_servers_certificate(self.__server.certificate): + raise Exception("Servers certificate not trusted") + self.logger.info(" Servers certificate is trusted") + + self.logger.info(" obtaining servers public key...") + servers_pk = self.__server.certificate.certificate.public_key() # encrypt image - image = ImageCryptoUtils.encrypt(image, self.password, x, y, w, h) - img_hash = ImageCryptoUtils.generate_image_hash(image) - # firmar hash - signature = self.__private_key.sign( - img_hash, - padding.PSS( - mgf=padding.MGF1(hashes.SHA256()), - salt_length=padding.PSS.MAX_LENGTH - ), - hashes.SHA256() - ) - pem = self._public_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo - ) - ImageCryptoUtils._write_metadata(image, {"signature":signature.hex(), "public_key": pem.hex()} ) + + self.logger.info(" Encrypting image...") + image = ImageCryptoUtils.encrypt(image, self.__plain_pass, x, y, w, h) + ImageCryptoUtils.generate_image_hash(image, self.__private_key, servers_pk) + self.logger.info(" Image encrypted") + self.logger.info(" Uploading image...") # upload image - return self._server.store_image(image, self.username, self.password) + self.__server.store_image(image, self.username, self.password, self.__certificate) + self.logger.info(" Image uploaded successfully") def remove_image(self, date: str, time: str) -> None: """Removes the image with the given name @@ -153,4 +261,26 @@ def remove_image(self, date: str, time: str) -> None: date (str): date of the image time (str): time of the image """ - return self._server.remove_image(self.username, self.password, date, time) + + return self.__server.remove_image(self.username, self.password, date, time) + + def __check_servers_certificate(self, certificate: Certificate) -> bool: + """ + Checks if the servers certificate is trusted + :param certificate: certificate to check + :return: True if the certificate is trusted, False otherwise + """ + # check servers certificate + serv_cert = self.__server.certificate + # is a trusted certificate? + trusted = False + while not trusted: + if isinstance(certificate, Certificate): + trusted = certificate in self.__trusted_certs + certificate = certificate.issuer_certificate + elif isinstance(certificate, x509.Certificate): + trusted = certificate in self.__trusted_certs + break + else: + raise ValueError("Servers certificate not valid") + return trusted diff --git a/src/packages/imgproc/img_cripto_utils.py b/src/packages/imgproc/img_cripto_utils.py index eb483d4..1e075c4 100644 --- a/src/packages/imgproc/img_cripto_utils.py +++ b/src/packages/imgproc/img_cripto_utils.py @@ -1,15 +1,31 @@ +from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import hashes, hmac from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from .imgproc import * import os +def cache(func): + saved = {} + def wraps(*args): + img = args[0].info + passw = args[1] + nhash = str((img, passw)) + if nhash in saved: + return saved[nhash] + else: + res = func(*args) + saved[nhash] = res + return res + + return wraps class ImageCryptoUtils: def __init__(self) -> None: pass @staticmethod + @cache def decrypt(img: Image, password: str) -> Image: """ Decrypts an image using AES-192 in CTR mode @@ -127,7 +143,7 @@ def __read_metadata(img: Image) -> dict: return img.info @staticmethod - def generate_image_hash(img: Image) -> None: + def generate_image_hash(img: Image, private_key: rsa.RSAPrivateKey, server_public_key: rsa.RSAPublicKey ) -> None: """ Generates a hash of the image and writes it in the metadata :param img: image @@ -143,5 +159,25 @@ def generate_image_hash(img: Image) -> None: # el key debe ir encriptado con RSA del server h.update(img_bytes + iv + salt + key) img_hash = h.finalize() - ImageCryptoUtils._write_metadata(img, {"hash_key": key.hex()}) - return img_hash + + signature = private_key.sign( + img_hash, + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), + salt_length=padding.PSS.MAX_LENGTH + ), + hashes.SHA256() + ) + + # encrypt key with server public key + key = server_public_key.encrypt( + key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + + ImageCryptoUtils.__write_metadata(img, {"hash": img_hash.hex(), "signature": signature.hex(),"key": key.hex()}) + diff --git a/src/packages/server/data/images/admin/2023/11/27/14_00_02.png b/src/packages/server/data/images/admin/2023/11/27/14_00_02.png new file mode 100644 index 0000000..c53a86b Binary files /dev/null and b/src/packages/server/data/images/admin/2023/11/27/14_00_02.png differ diff --git a/src/packages/server/data/users.json b/src/packages/server/data/users.json index 55297cc..e69de29 100644 --- a/src/packages/server/data/users.json +++ b/src/packages/server/data/users.json @@ -1,7 +0,0 @@ -[ - { - "name": "admin", - "password": "526560d5f8e6c94755c20b27e21ab3ab33cd02f632ef811604abb62778ce74d2", - "salt_p": "b8a5101af96448879d51f5a30ca30f7a" - } -] \ No newline at end of file diff --git a/src/packages/server/server.py b/src/packages/server/server.py index bcafc9f..bd10a4e 100644 --- a/src/packages/server/server.py +++ b/src/packages/server/server.py @@ -8,13 +8,64 @@ import re import uuid - +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives.asymmetric import rsa +from packages.authorities.ursula import Ursula +from packages.authorities.certificate import Certificate +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.exceptions import InvalidSignature +import logging class Server(): def __init__(self) -> None: + # logging + # Configura el logger para la clase Server + self.logger = logging.getLogger('Server') + self.logger.setLevel(logging.DEBUG) + + # Crea un controlador para guardar logs en un archivo llamado server.log + file_handler = logging.FileHandler('SYSTEM.log') + file_handler.setLevel(logging.INFO) + + # Crea un formateador para los logs + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + file_handler.setFormatter(formatter) + + # Agrega el controlador al logger de la clase Server + self.logger.addHandler(file_handler) + self.__sm = StorageManager() self.__sm.create_directories() + self.__subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "AL"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Germany"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "Munich"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "bmw"), + x509.NameAttribute(NameOID.COMMON_NAME, "bmwaitzniert.com"), + ]) + self.__private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + csr = x509.CertificateSigningRequestBuilder().subject_name( + self.__subject + ).sign(self.__private_key, hashes.SHA256()) + ursula = Ursula() + self.__certificate = ursula.issueCertificate(csr) + + self.__trusted_certs = [self.__certificate] + ursula.trusted_certs + + @property + def trusted_certs(self): + return self.__trusted_certs + + + @property + def certificate(self): + return self.__certificate + def __get_users(self) -> list: """Returns the list of users Returns: @@ -41,14 +92,26 @@ def create_user(self, name, password) -> None: password (str): password of the user (hashed) """ # check if name is unique + self.logger.info(" Creating user...") + self.logger.info(" Checking if name is unique...") users = self.__get_users() for user in users: if user.name == name: raise ValueError("Name is already taken") - - - # TODO la contraseña estara encriptada con RSA y el servidor tendra la clave privada - # TODO desencriptar la contraseña con la clave privada del servidor + self.logger.info(" Name is unique") + self.logger.info(" Decrypting password...") + # decrypt password + password = self.__private_key.decrypt( + bytes.fromhex(password), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ).decode() + self.logger.info(" Password decrypted") + self.logger.info(" Checking password...") + # comprobar que la contraseña cumple los requisitos # 12 caracteres, 1 mayuscula, 1 minuscula, 1 numero, 1 caracter especial if len(password) < 12: @@ -62,6 +125,8 @@ def create_user(self, name, password) -> None: elif not re.search("[!@#$%^&*()_+-={};':\"\\|,.<>/?]", password): raise ValueError("Password must contain at least one special character") + self.logger.info(" Password is valid") + self.logger.info(" Generating passwords KDF...") # KDF de la contraseña salt_p = uuid.uuid4().hex # son 16 bytes = 198 bits kdf = Scrypt( @@ -72,10 +137,13 @@ def create_user(self, name, password) -> None: p = 1 ) password = kdf.derive(bytes(password, "utf-8")).hex() + self.logger.info(" Password KDF generated") + self.logger.info(" Generating user...") # create user users = self.__get_users() users.append(User(name, password, salt_p)) self.__sm.update_users_json(users) + self.logger.info(" User created") def remove_user(self, name: str, password: str): @@ -95,13 +163,15 @@ def remove_user(self, name: str, password: str): else: raise ValueError("User not found") - def store_image(self, image: Image, user_name, password): + def store_image(self, image: Image, user_name, password, certificate: Certificate): """ Stores the image in the server, IMAGE FORMAT: PNG Args: image_path (str): path to the image camera_name (str): name of the camera user_name (str): name of the owner """ + self.logger.info(" received image") + if user_name == "" or user_name is None: raise ValueError("User cannot be empty") if image is None: @@ -109,18 +179,31 @@ def store_image(self, image: Image, user_name, password): # check if owner is valid and if password is correct + self.logger.info(" Checking users credentials...") if not self.__authenticate(user_name, password): raise ValueError("User or password incorrect") + self.logger.info(" Users credentials are valid") - - # checK tags #TODO - pass - # check hash signature #TODO - # get metadata + self.logger.info(" Checking image integrity") image_metadata = image.info + hash = image_metadata["hash"] + key = bytes.fromhex(image_metadata["key"]) + # desencriptar la clave con la clave privada del servidor + self.logger.info(" Checking image hash...") + self.logger.info(" Decrypting hash key...") + key = self.__private_key.decrypt( + key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + # regenerate hash + self.logger.info(" Regenerating hash...") img_bytes = image.tobytes() iv = bytes.fromhex(image_metadata["aes_iv"]) salt = bytes.fromhex(image_metadata["aes_key_salt"]) @@ -130,18 +213,33 @@ def store_image(self, image: Image, user_name, password): h.update(img_bytes + iv + salt + key) img_hash = h.finalize() + if hash != img_hash.hex(): + raise ValueError("Hashes do not match") + self.logger.info(" Hashes match") - # check hash signature - hash_sign = image_metadata["signature"] - # get public key # FIXME - public_key = image_metadata["public_key"] - public_key = serialization.load_pem_public_key( - bytes.fromhex(public_key) - ) - # decript hash signature + # check certificate + self.logger.info(" Checking client certificate...") + certificate_pk = certificate.certificate.public_key() + trusted = False + while not trusted: + if isinstance(certificate, Certificate): + trusted = certificate in self.__trusted_certs + certificate = certificate.issuer_certificate + elif isinstance(certificate, x509.Certificate): + trusted = certificate in self.__trusted_certs + break + else: + raise ValueError("Certificate not valid") + + if not trusted: + raise ValueError("Certificate not trusted") + + # check sign with public key + self.logger.info(" Checking signature...") + signature = bytes.fromhex(image_metadata["signature"]) try: - public_key.verify( - bytes.fromhex(hash_sign), + certificate_pk.verify( + signature, img_hash, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), @@ -150,27 +248,25 @@ def store_image(self, image: Image, user_name, password): hashes.SHA256() ) except InvalidSignature: - print("Invalid signature") - return - + raise ValueError("Signature not valid") + self.logger.info(" Signature is valid") - # check certificate #TODO - pass - # store image - - # dev and debug purposes image.load() + self.logger.info(" Image integrity is valid") + self.logger.info(" Storing image...") # META DATA # copy metadata from original image to new image info = PngImagePlugin.PngInfo() for key, value in image.info.items(): info.add_text(str(key), str(value)) - # add new metadata + + # store image self.__sm.storage_img(image, user_name, info) + self.logger.info(" Image stored") def get_images(self, num: int, username: str | None = None, date: str | None =None, time: str | None = None) -> list: """Returns a list of images from the given camera @@ -197,13 +293,17 @@ def login(self, name: str, password: str) -> bool: Returns: bool: True if the user was logged in, False otherwise """ + self.logger.info(" Logging in user...") # update users users = self.__get_users() - + self.logger.info(" Checking if user exists...") # check if user exists usernames = [ user.name for user in users ] if name not in usernames: return False + self.logger.info(" User exists") + + self.logger.info(" Checking password...") # check if password is correct return self.__authenticate(name, password) @@ -215,6 +315,10 @@ def remove_image(self, username: str, password:str, date: str, time: str) -> Non date (str): date of the image time (str): time of the image """ + self.logger.info(" Removing image...") + # check if user exists and if password is correct + + self.logger.info(" Checking users credentials...") if username == "" or username is None: raise ValueError("Username cannot be empty") elif date == "": @@ -224,14 +328,28 @@ def remove_image(self, username: str, password:str, date: str, time: str) -> Non if not self.__authenticate(username, password): raise ValueError("User or password incorrect") - + self.logger.info(" Users credentials are valid") self.__sm.remove_image(username, date, time) + self.logger.info(" Image removed") def __authenticate(self, name: str, password: str) -> bool: # get users salt and password auth = False users = self.__get_users() + + # decrypt password + self.logger.info(" Decrypting password...") + password = self.__private_key.decrypt( + bytes.fromhex(password), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ).decode() + self.logger.info(" Password decrypted") + for user in users: if user.name == name: # generate kdf with salt @@ -246,6 +364,7 @@ def __authenticate(self, name: str, password: str) -> bool: if user.password == derivated_pass: auth = True + self.logger.info(" Password is correct") # update salt and password user.salt_p = uuid.uuid4().hex kdf = Scrypt( @@ -268,4 +387,4 @@ def clear_server(self): self.__sm.delete_all_images() self.__sm.delete_all_users() self.__sm.create_directories() - print("Server cleared") \ No newline at end of file + self.logger.info("Server cleared") \ No newline at end of file diff --git a/src/screens/image_selector_toplevel.py b/src/screens/image_selector_toplevel.py index 8e41201..f5dd48e 100644 --- a/src/screens/image_selector_toplevel.py +++ b/src/screens/image_selector_toplevel.py @@ -97,8 +97,7 @@ def check_and_send(self): self.app.showUserScreen() self.destroy() except ValueError as e: - print(e) - messagebox.showerror("Error", f"{e}") + messagebox.showerror("Error", str(e)) def update_canvas_selection(self):