Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Multiprocessing ProcessPoolExecutor + Dynamic Logs + Dependencies Update #22

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 35 additions & 40 deletions jwtcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import time
from datetime import datetime, timedelta
from itertools import chain, product
from concurrent.futures import ProcessPoolExecutor # Improvement 1: For multiprocessing

import coloredlogs
import jwt
from tqdm import tqdm

logger = logging.getLogger(__name__)
coloredlogs.install(level='DEBUG', milliseconds=True)
coloredlogs.install(level='DEBUG', milliseconds=True, fmt='%(asctime)s %(levelname)s %(message)s')


def parse_args():
Expand Down Expand Up @@ -105,16 +106,11 @@ def parse_args():

parser.add_argument(
"-lL", "--log-level",
default=logging.INFO,
dest="log_level",
# TODO: Improve how to retrieve all log levels
choices=[
'DEBUG',
'INFO',
],
default='logging.INFO',
choices=logging._nameToLevel.keys(), # Improvement 2: Retrieve all log levels dynamically
help="Set the logging level",
type=str,
required=False,
type = str,
required=False
)

parser.add_argument(
Expand Down Expand Up @@ -222,15 +218,12 @@ def is_vulnerable(args):
args {object} -- The command-line arguments
"""
headers = jwt.get_unverified_header(args.token)
alg = headers.get('alg')
if alg in ["HS256", "HS512", "HS384", "None"]:
logger.info(f"JWT potentially vulnerable with algorithm: {alg}")
else:
logger.warning(f"JWT signed with non-vulnerable algorithm: {alg}")

if headers['alg'] == "HS256":
logging.info("JWT vulnerable to HS256 guessing attacks")
elif headers['alg'] == "HS512":
logging.info("JWT vulnerable to HS512 guessing attacks")
elif headers['alg'] == "HS384":
logging.info("JWT vulnerable to HS384 guessing attacks")
elif headers['alg'] == "None":
logging.info("JWT vulnerable to CVE-2018-1000531")


def hs256_attack(args):
Expand All @@ -241,29 +234,31 @@ def hs256_attack(args):
args {object} -- The command-line arguments
"""
headers = jwt.get_unverified_header(args.token)

if not headers['alg'] == "HS256" and not headers['alg'] == "HS512" and not headers['alg'] == "HS384":
logging.error("JWT signed using an algorithm other than HS256 or HS512 or HS384")
else:
tqdm_disable = True if args.log_level == "DEBUG" else False

if args.attack_mode == "brute-force":
# Count = ....
for candidate in tqdm(bruteforce(args.charset, args.increment_min, args.increment_max), disable=tqdm_disable):
if run(args.token, candidate):
return candidate

return None

elif args.attack_mode == "wordlist":
word_count = len(open(args.wordlist.name, "r",
encoding="utf-8",
errors="ignore").readlines())
for entry in tqdm(args.wordlist, disable=tqdm_disable, total=word_count):
if run(args.token, entry.rstrip()):
return entry.rstrip()
if headers['alg'] not in ["HS256", "HS512", "HS384"]:
logger.error("JWT signed using unsupported algorithm")
return None

tqdm_disable = args.log_level == "DEBUG"
candidate = None

if args.attack_mode == "brute-force":
# Improvement: Use multiprocessing for brute-force attack
with ProcessPoolExecutor() as executor:
for result in tqdm(executor.map(lambda x: run(args.token, x), bruteforce(args.charset, args.increment_min, args.increment_max)), disable=tqdm_disable):
if result:
candidate = result
break
elif args.attack_mode == "wordlist":
# Improved batch reading for wordlist mode to reduce I/O time
wordlist = [line.strip() for line in args.wordlist]
with ProcessPoolExecutor() as executor:
for result in tqdm(executor.map(lambda word: run(args.token, word), wordlist), disable=tqdm_disable, total=len(wordlist)):
if result:
candidate = result
break

return candidate

return None


def main():
Expand Down
14 changes: 7 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
autopep8==1.5.2
coloredlogs==14.0
humanfriendly==8.2
pycodestyle==2.6.0
PyJWT==2.4.0
pyreadline==2.1
tqdm==4.46.0
autopep8==1.6.0 # Updated from 1.5.2 to 1.6.0
coloredlogs==15.0.1 # Updated from 14.0 to 15.0.1
humanfriendly==10.0 # Updated from 8.2 to 10.0
pycodestyle==2.10.0 # Updated from 2.6.0 to 2.10.0
PyJWT==2.6.0 # Updated from 2.4.0 to 2.6.0
pyreadline==2.1 # No update available
tqdm==4.65.0 # Updated from 4.46.0 to 4.65.0