diff --git a/jwtcat.py b/jwtcat.py index 44ccd44..31623c8 100755 --- a/jwtcat.py +++ b/jwtcat.py @@ -22,13 +22,14 @@ import time from datetime import datetime, timedelta from itertools import chain, product +from concurrent.futures import ProcessPoolExecutor # Improvement 1: For multiprocessing import coloredlogs import jwt from tqdm import tqdm logger = logging.getLogger(__name__) -coloredlogs.install(level='DEBUG', milliseconds=True) +coloredlogs.install(level='DEBUG', milliseconds=True, fmt='%(asctime)s %(levelname)s %(message)s') def parse_args(): @@ -105,16 +106,11 @@ def parse_args(): parser.add_argument( "-lL", "--log-level", - default=logging.INFO, - dest="log_level", - # TODO: Improve how to retrieve all log levels - choices=[ - 'DEBUG', - 'INFO', - ], + default='logging.INFO', + choices=logging._nameToLevel.keys(), # Improvement 2: Retrieve all log levels dynamically help="Set the logging level", - type=str, - required=False, + type = str, + required=False ) parser.add_argument( @@ -222,15 +218,12 @@ def is_vulnerable(args): args {object} -- The command-line arguments """ headers = jwt.get_unverified_header(args.token) + alg = headers.get('alg') + if alg in ["HS256", "HS512", "HS384", "None"]: + logger.info(f"JWT potentially vulnerable with algorithm: {alg}") + else: + logger.warning(f"JWT signed with non-vulnerable algorithm: {alg}") - if headers['alg'] == "HS256": - logging.info("JWT vulnerable to HS256 guessing attacks") - elif headers['alg'] == "HS512": - logging.info("JWT vulnerable to HS512 guessing attacks") - elif headers['alg'] == "HS384": - logging.info("JWT vulnerable to HS384 guessing attacks") - elif headers['alg'] == "None": - logging.info("JWT vulnerable to CVE-2018-1000531") def hs256_attack(args): @@ -241,29 +234,31 @@ def hs256_attack(args): args {object} -- The command-line arguments """ headers = jwt.get_unverified_header(args.token) - - if not headers['alg'] == "HS256" and not headers['alg'] == "HS512" and not headers['alg'] == "HS384": - logging.error("JWT signed using an algorithm other than HS256 or HS512 or HS384") - else: - tqdm_disable = True if args.log_level == "DEBUG" else False - - if args.attack_mode == "brute-force": - # Count = .... - for candidate in tqdm(bruteforce(args.charset, args.increment_min, args.increment_max), disable=tqdm_disable): - if run(args.token, candidate): - return candidate - - return None - - elif args.attack_mode == "wordlist": - word_count = len(open(args.wordlist.name, "r", - encoding="utf-8", - errors="ignore").readlines()) - for entry in tqdm(args.wordlist, disable=tqdm_disable, total=word_count): - if run(args.token, entry.rstrip()): - return entry.rstrip() + if headers['alg'] not in ["HS256", "HS512", "HS384"]: + logger.error("JWT signed using unsupported algorithm") + return None + + tqdm_disable = args.log_level == "DEBUG" + candidate = None + + if args.attack_mode == "brute-force": + # Improvement: Use multiprocessing for brute-force attack + with ProcessPoolExecutor() as executor: + for result in tqdm(executor.map(lambda x: run(args.token, x), bruteforce(args.charset, args.increment_min, args.increment_max)), disable=tqdm_disable): + if result: + candidate = result + break + elif args.attack_mode == "wordlist": + # Improved batch reading for wordlist mode to reduce I/O time + wordlist = [line.strip() for line in args.wordlist] + with ProcessPoolExecutor() as executor: + for result in tqdm(executor.map(lambda word: run(args.token, word), wordlist), disable=tqdm_disable, total=len(wordlist)): + if result: + candidate = result + break + + return candidate - return None def main(): diff --git a/requirements.txt b/requirements.txt index 8029220..89e8d9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -autopep8==1.5.2 -coloredlogs==14.0 -humanfriendly==8.2 -pycodestyle==2.6.0 -PyJWT==2.4.0 -pyreadline==2.1 -tqdm==4.46.0 +autopep8==1.6.0 # Updated from 1.5.2 to 1.6.0 +coloredlogs==15.0.1 # Updated from 14.0 to 15.0.1 +humanfriendly==10.0 # Updated from 8.2 to 10.0 +pycodestyle==2.10.0 # Updated from 2.6.0 to 2.10.0 +PyJWT==2.6.0 # Updated from 2.4.0 to 2.6.0 +pyreadline==2.1 # No update available +tqdm==4.65.0 # Updated from 4.46.0 to 4.65.0