Skip to content

Commit

Permalink
Refine the docstrings and typing for client
Browse files Browse the repository at this point in the history
Signed-off-by: Akashdeep Dhar <akashdeep.dhar@gmail.com>
  • Loading branch information
gridhead committed Nov 10, 2024
1 parent 6c4e3f2 commit 3917b1c
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 32 deletions.
34 changes: 34 additions & 0 deletions expedite/client/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,26 @@


def derive_code(password: str = standard.client_pswd, salt: bytes = standard.client_salt) -> bytes:
"""
Derive byte convertion from password and salt composition
:param password: Password provided by the clients
:param salt: Byte array for additional protection
:return: Cryptography HMAC code
"""
kdfo = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
return kdfo.derive(password.encode())


def encr_bite(data: bytes = b"", code: bytes = standard.client_code, invc: bytes = standard.client_invc) -> bytes:
"""
Encrypt file chunks using converted cryptography HMAC code
:param data: Chunks to read, encrypt and deliver
:param code: Generated cryptography HMAC code
:param invc: Initialization vector for added protection
:return: Encrypted bytes
"""
cipher = Cipher(algorithms.AES(code), modes.CBC(invc), backend=default_backend())
encrob = cipher.encryptor()
padder = padding.PKCS7(algorithms.AES.block_size).padder()
Expand All @@ -47,6 +62,14 @@ def encr_bite(data: bytes = b"", code: bytes = standard.client_code, invc: bytes


def decr_bite(data: bytes = b"", code: bytes = standard.client_code, invc: bytes = standard.client_invc) -> bytes:
"""
Decrypt file chunks using converted cryptography HMAC code
:param data: Chunks to collect, decrypt and save
:param code: Generated cryptography HMAC code
:param invc: Initialization vector for added protection
:return: Decrypted bytes
"""
try:
cipher = Cipher(algorithms.AES(code), modes.CBC(invc), backend=default_backend())
decrob = cipher.decryptor()
Expand All @@ -58,6 +81,11 @@ def decr_bite(data: bytes = b"", code: bytes = standard.client_code, invc: bytes


def encr_metadata() -> bytes:
"""
Encrypt metadata to deliver before file contents
:return: Encrypted bytes
"""
standard.client_invc, standard.client_salt = os.urandom(16), os.urandom(16)
data = dumps({"call": "meta", "name": standard.client_filename, "size": standard.client_filesize, "chks": len(standard.client_bind)-1})
standard.client_code = derive_code(standard.client_pswd, standard.client_salt)
Expand All @@ -67,6 +95,12 @@ def encr_metadata() -> bytes:


def decr_metadata(pack: bytes = b"") -> tuple:
"""
Decrypt metadata to collect before file contents
:param pack: Chunks to decrypt
:return: Decrypted chunks
"""
standard.client_invc, standard.client_salt = pack[0:16], pack[16:32]
data = pack[32:]
standard.client_code = derive_code(standard.client_pswd, standard.client_salt)
Expand Down
34 changes: 34 additions & 0 deletions expedite/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,30 @@


def find_size() -> int:
"""
Retrieve the file size using the file location
:return: Size of the intended file
"""
return getsize(standard.client_file)


def find_name() -> str:
"""
Retrieve the file name using the file location
:return: Name of the intended file
"""
return basename(standard.client_file)


def ease_size(size: Union[int, float]) -> str:
"""
Retrieve the file size in human-readable format
:param size: Size in byte count format
:return: Size in human-readable format
"""
unitlist = ["B", "KB", "MB", "GB", "TB", "PB"]
indx, opsz = 0, size
if size == 0:
Expand All @@ -48,6 +64,11 @@ def ease_size(size: Union[int, float]) -> str:


def bite_file() -> list:
"""
Retrieve the list of read ranges from chunk size
:return: List of read ranges
"""
init, size, bite = 0, standard.client_filesize, []
while init < size:
bite.append(init)
Expand All @@ -60,6 +81,13 @@ def bite_file() -> list:


def read_file(init: int = 0, fina: int = 0) -> bytes:
"""
Retrieve the chunk from the provided byte range
:param init: Starting byte index for reading
:param fina: Stopping byte index for reading
:return: Chunks to read
"""
if exists(standard.client_file):
with open(standard.client_file, "rb") as file:
file.seek(init)
Expand All @@ -72,6 +100,12 @@ def read_file(init: int = 0, fina: int = 0) -> bytes:


def fuse_file(pack: bytes = b"") -> bool:
"""
Create and join the chunks on the storage device
:param pack: Chunks to save
:return:
"""
if not standard.client_fileinit:
mode, standard.client_fileinit = "wb", True
else:
Expand Down
14 changes: 12 additions & 2 deletions expedite/client/bridge/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from expedite.client.bridge.room import MainWindow


def load_custom_font():
def load_custom_font() -> None:
"""
Populate the application database with custom fonts
:return:
"""
fontlist = [
":font/font/sans-bold.ttf",
":font/font/sans-rlar.ttf",
Expand All @@ -42,7 +47,12 @@ def load_custom_font():
QFontDatabase.addApplicationFont(indx)


def main():
def main() -> None:
"""
Start the worker module to start the transfer service
:return:
"""
os.environ["QT_AUTO_SCREEN_SCALE_FACTOR"] = "1"
QApplication.setStyle("Fusion")
app = QApplication(sys.argv)
Expand Down
4 changes: 2 additions & 2 deletions expedite/client/bridge/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
collect_digest_checks,
collect_dropping_summon,
collect_metadata,
collect_permission_to_join,
collect_connection_to_server,
collect_separation_from_mistaken_password,
deliver_confirmation,
deliver_connection_to_server,
Expand Down Expand Up @@ -221,7 +221,7 @@ async def maintain_connection(self):
if standard.client_plan in ["SEND", "RECV"]:
# If the purpose of the client is either DELIVERING or COLLECTING
if mesgdict["call"] == "okay":
await collect_permission_to_join(mesgdict["iden"])
await collect_connection_to_server(mesgdict["iden"])
self.statarea.showMessage("Please share your acquired identity to begin interaction")
self.identity.setText(f"{mesgdict["iden"]}")
elif mesgdict["call"] in ["awry", "lone"]:
Expand Down
98 changes: 82 additions & 16 deletions expedite/client/bridge/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@
from os.path import exists

from PySide6.QtWidgets import QFileDialog
from click import Tuple


def show_location_dialog(parent=None, oper: str = "") -> str:
"""
Select filepath for the intended file for delivering or collecting
:param parent: Parent window within which the location dialog exists
:param oper: Operation intent for choosing either file or directory
:return:
"""
dialog = QFileDialog()
if oper == "dlvr":
client_path = dialog.getOpenFileName(parent, "Select location", "", "All Files (*)")[0]
Expand All @@ -36,13 +44,25 @@ def show_location_dialog(parent=None, oper: str = "") -> str:


def truncate_text(text: str = "", size: int = 32) -> str:
"""
Limit text elements to a certain character count for an elegant fit
:param text: Text elements that need to be checked for fitting
:param size: Character count within which text needs to be fit
:return: Truncated text
"""
if len(text) >= size:
return text[0:size-3] + "..."
else:
return text


def return_detail_text() -> str:
"""
Retrieve application information text for showing on the dialog box
:return: Application information
"""
text = """
<b>Expedite Bridge v{vers}</b><br/>
<i>A simple encrypted file transfer service for humans</i><br/><br/>
Expand All @@ -54,7 +74,12 @@ def return_detail_text() -> str:


class ValidateFields:
def __init__(self):
def __init__(self) -> None:
"""
Initialize fields validation class for confirmation
:return:
"""
self.okay = {
"size": False,
"time": False,
Expand All @@ -70,7 +95,13 @@ def __init__(self):
"pswd": "Password cannot be an empty string"
}

def verify_size(self, size):
def verify_size(self, size) -> None:
"""
Ensure that processing size must be an integer value between 1024 and 524288
:param size: Processing size
:return:
"""
self.okay["size"] = True
try:
oper = int(size.strip())
Expand All @@ -79,7 +110,13 @@ def verify_size(self, size):
except ValueError:
self.okay["size"] = False

def verify_time(self, time):
def verify_time(self, time) -> None:
"""
Ensure that expiry window must be an integer value between 5 and 300
:param time: Expiry window
:return:
"""
self.okay["time"] = True
try:
oper = int(time.strip())
Expand All @@ -88,38 +125,67 @@ def verify_time(self, time):
except ValueError:
self.okay["time"] = False

def verify_file(self, file):
def verify_file(self, file) -> None:
"""
Ensure that filepath for delivering or collecting contents must exist
:param path: Load filepath
:return:
"""
self.okay["file"] = True
if not exists(file):
self.okay["file"] = False

def verify_path(self, path):
def verify_path(self, path) -> None:
"""
Ensure that filepath for delivering or collecting contents must exist
:param path: Save filepath
:return:
"""
self.okay["path"] = True
if path.strip() != "" and not exists(path):
self.okay["path"] = False

def verify_pswd(self, pswd):
def verify_pswd(self, pswd) -> None:
"""
Ensure that password cannot be an empty string
:param pswd: Password string
:return:
"""
self.okay["pswd"] = True
if pswd.strip() == "":
self.okay["pswd"] = False

def report_dlvr(self, size, time, file, pswd):
def report_dlvr(self, size, time, file, pswd) -> tuple[tuple[bool, bool, bool, bool], str]:
"""
Retrieve field validation results for delivering intent
:param size: Validity confirmation of processing size
:param time: Validity confirmation of waiting window
:param file: Validity confirmation of delivering filepath
:param pswd: Validity confirmation of password string
:return: Validity confirmation for required elements
"""
self.verify_size(size)
self.verify_time(time)
self.verify_file(file)
self.verify_pswd(pswd)
lict = [self.text[indx] for indx in ["size", "time", "file", "pswd"] if not self.okay[indx]]
return (
(self.okay["size"], self.okay["time"], self.okay["file"], self.okay["pswd"]),
"\n".join(lict).strip()
)
return (self.okay["size"], self.okay["time"], self.okay["file"], self.okay["pswd"]), "\n".join(lict).strip()

def report_clct(self, time, path, pswd) -> tuple[tuple[bool, bool, bool], str]:
"""
Retrieve field validation results for collecting intent
def report_clct(self, time, path, pswd):
:param time: Validity confirmation of waiting window
:param path: Validity confirmation of collecting filepath
:param pswd: Validity confirmation of password string
:return: Validity confirmation for required elements
"""
self.verify_time(time)
self.verify_path(path)
self.verify_pswd(pswd)
lict = [self.text[indx] for indx in ["time", "path", "pswd"] if not self.okay[indx]]
return (
(self.okay["time"], self.okay["path"], self.okay["pswd"]),
"\n".join(lict).strip(),
)
return (self.okay["time"], self.okay["path"], self.okay["pswd"]), "\n".join(lict).strip()
Loading

0 comments on commit 3917b1c

Please sign in to comment.