diff --git a/CHANGELOG.md b/CHANGELOG.md index 88254d0..16abd85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +## [3.1.0] - 2024-08-13 +### Added +- Added new functionality to enhance the custom passwords passed to lil-pwny + - Lil Pwny can now take the custom password list and create a number of variations of each password in the list: + - Passwords with common 'leetspeak' substitutions (e.g. `P@ssw0rd`) + - Uppercase versions of the password, and uppercase first characters (e.g. `PASSWORD`, `Password`) + - Passwords with common special characters appended or prepended (e.g. `password!`, `!password`) + - Passwords padded with common alphanumeric characters, special characters and repetitions of themselves to make them meet a given minimum length (e.g. `password123!`, `!passwordabc`, `passwordpassword`) + - Passwords with dates appended starting from the year 1950 up to 10 years from today's date (e.g. `password1950`, `password2034`) + - To give an idea, a password list of 100 custom passwords generates 49848660 variations +- Logging now includes the plaintext password for custom password list matches + - This is useful for identifying the password that was found in the custom password list + - These are redacted if the `--obfuscate` flag is used + +### Changed +- `--debug` option switched to `--verbose` to more accurately describe output + - Some logging output moved from INFO level to DEBUG (displayed when `--verbose` is passed) + ## [3.0.1] - 2024-07-22 ### Added - Updated logging diff --git a/README.md b/README.md index d715a4e..71ddd62 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,25 @@ Fast offline auditing of Active Directory passwords using Python. ## About Lil Pwny Lil Pwny is a Python application to perform an offline audit of NTLM hashes of users' passwords, recovered from Active Directory, against known compromised passwords from Have I Been Pwned. Results will be output in JSON format containing the username, matching hash (can be obfuscated), and how many times the matching password has been seen in HIBP -There are also additional features: -- Ability to provide a list of your own custom passwords to check AD users against. This allows you to check user passwords against passwords relevant to your organisation that you suspect people might be using. These are NTLM hashed, and AD hashes are then compared with this as well as the HIBP hashes. -- Return a list of accounts using the same passwords. Useful for finding users using the same password for their administrative and standard accounts. -- Obfuscate hashes in output, for if you don't want to handle or store live user NTLM hashes. - More information about Lil Pwny can be found [on my blog](https://papermtn.co.uk/category/tools/lil-pwny/) +## Features + +- **Custom Password Auditing**: Ability to provide a list of your own custom passwords to check AD users against. This allows you to check user passwords against passwords relevant to your organisation that you suspect people might be using. + - Pass a .txt file with the plaintext passwords you want to search for, these are then NTLM hashed and AD hashes are then compared with this as well as the HIBP hashes. +- **Detect Duplicates**: Return a list of accounts using the same passwords. Useful for finding users using the same password for their administrative and standard accounts. +- **Obfuscated Output**: Obfuscate hashes in output, for if you don't want to handle or store live user NTLM hashes. + +### Custom Password List Enhancement +Lil Pwny provides the functionality to enhance your custom password list by adding commonly used variants of your custom passwords. These include: +- Passwords with common 'leetspeak' substitutions (e.g. `P@ssw0rd`) +- Uppercase versions of the password, and uppercase first characters (e.g. `PASSWORD`, `Password`) +- Passwords with common special characters appended or prepended (e.g. `password!`, `!password`) +- Passwords padded with common alphanumeric characters, special characters and repetitions of themselves to make them meet a given minimum length (e.g. `password123!`, `!passwordabc`, `passwordpassword`) + - You pass your desired minimum password length to Lil Pwny when selecting the custom list enhancement option +- Passwords with dates appended starting from the year 1950 up to 10 years from today's date (e.g. `password1950`, `password2034`) + +A custom password list of 100 plaintext passwords generates 49848660 variations. ## Resources This application has been developed to make the most of multiprocessing in Python, with the aim of it working as fast as possible on consumer level hardware. @@ -26,10 +38,14 @@ Because it uses multiprocessing, the more cores you have available, the faster L - 12 logical cores - 0:04:28.579201 ## Output -Lil Pwny will output results as either to stdout or JSON: +Lil Pwny will output results as either to stdout: + + + +or JSON: ```json -{"localtime": "2021-00-00 00:00:00,000", "level": "NOTIFY", "source": "Lil Pwny", "match_type": "hibp", "detection_data": {"username": "RICKON.STARK", "hash": "0C02C50B2B08F2979DFDE12EDA472FC1", "matches_in_hibp": "24230577", "obfuscated": "True"}} +{"localtime": "2021-00-00 00:00:00,000", "level": "NOTIFY", "source": "Lil Pwny", "match_type": "hibp", "detection_data": {"username": "RICKON.STARK", "hash": "32ED87BDB5FDC5E9CBA88547376818D4", "matches_in_hibp": "24230577", "obfuscated": "True"}} ``` You can redirect the JSON output of Lil Pwny to file: ```commandline @@ -48,7 +64,7 @@ pip install lil-pwny Lil-pwny will be installed as a global command, use as follows: ``` -usage: lil-pwny [-h] -hibp HIBP [--version] [-c CUSTOM] -ad AD_HASHES [-d] [-output {file,stdout,json}] [-o] [--debug] +usage: lil-pwny [-h] -hibp HIBP [-v] [-c CUSTOM] [-custom-enhance CUSTOM_ENHANCE] -ad AD_HASHES [-d] [-output {file,stdout,json}] [-o] [--verbose] Fast offline auditing of Active Directory passwords using Python @@ -56,17 +72,19 @@ options: -h, --help show this help message and exit -hibp HIBP, --hibp HIBP The .txt file containing HIBP NTLM hashes - --version show program's version number and exit + -v, --version show program's version number and exit -c CUSTOM, --custom CUSTOM .txt file containing additional custom passwords to check for + -custom-enhance CUSTOM_ENHANCE, --custom-enhance CUSTOM_ENHANCE + generate an enhanced custom password list based on the provided custom password list. Must be used with -c/--custom flag. The enhanced list will stored in memory and not + written to disk. Provide the minimum length of the passwords you want. Default is 8 -ad AD_HASHES, --ad-hashes AD_HASHES The .txt file containing NTLM hashes from AD users -d, --duplicates Output a list of duplicate password users -output {file,stdout,json}, --output {file,stdout,json} Where to send results -o, --obfuscate Obfuscate hashes from discovered matches by hashing with a random salt - --debug Turn on debug level logging - + --verbose Turn on verbose logging ``` @@ -103,5 +121,12 @@ Get-ADDBAccount -All -DBPath '.\Active Directory\ntds.dit' -BootKey $bootKey | F ### Step 3: Download the latest HIBP hash file The file can be downloaded from the HIBP API using a .net utility [here](https://github.com/HaveIBeenPwned/PwnedPasswordsDownloader) +### Optional Step: Filter unwanted AD accounts +The PowerShell script in the [scripts](./scripts/Filter-ADUsers) directory can be used to remove unwanted accounts from the IFM output before processing. These include: + +- Disabled accounts +- Computer accounts + + ## Resources - [ntdsutil & IFM](https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-server-2012-r2-and-2012/cc732530(v=ws.11)) diff --git a/images/stdout-screenshot.png b/images/stdout-screenshot.png new file mode 100644 index 0000000..479dcea Binary files /dev/null and b/images/stdout-screenshot.png differ diff --git a/poetry.lock b/poetry.lock index c18fb33..b3eeb04 100644 --- a/poetry.lock +++ b/poetry.lock @@ -153,5 +153,5 @@ files = [ [metadata] lock-version = "2.0" -python-versions = "^3.12" -content-hash = "83340d4d8e4ef0412409fc4aac3b45cb2ffd7b44760f2ce29cd50030ad0cc3f9" +python-versions = ">=3.11" +content-hash = "cc362e9139233beab282a2a0e784830a2a47e84a09891e25f019aa39ba4baa97" diff --git a/pyproject.toml b/pyproject.toml index 7153625..f3a5878 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,13 @@ [tool.poetry] name = "lil-pwny" -version = "3.0.1" +version = "3.1.0" description = "Fast offline auditing of Active Directory passwords using Python and multiprocessing" authors = ["PaperMtn "] license = "GPL-3.0" readme = "README.md" [tool.poetry.dependencies] -python = "^3.12" +python = ">=3.11" colorama = "^0.4.6" pycryptodome = "^3.20.0" charset-normalizer = "^3.3.2" diff --git a/scripts/Filter-ADUsers/Filter-ADUsers.ps1 b/scripts/Filter-ADUsers/Filter-ADUsers.ps1 new file mode 100644 index 0000000..451f3db --- /dev/null +++ b/scripts/Filter-ADUsers/Filter-ADUsers.ps1 @@ -0,0 +1,54 @@ +<# +.DESCRIPTION + This script Filters username and NTLM hash pairs from the given IFM output. It excludes entries that are for disabled users or computer accounts by checking against Active Directory using the ActiveDirectory module. Users are prompted to select a file at runtime. +.PARAMETER InputFile + The input file selected by the user, containing username:hash pairs from the AD IFM dump. +.OUTPUTS + A file named 'filtered_ad_hashes.txt' containing the filtered username:hash pairs from Active Directory output. +.EXAMPLE + .\Filter-ADUsers.ps1 + This command runs the script and prompts the user to select an input file for processing. +#> + +Import-Module ActiveDirectory +Add-Type -AssemblyName System.Windows.Forms + +# Create an OpenFileDialog to prompt the user for a file +$OpenFileDialog = New-Object System.Windows.Forms.OpenFileDialog +$OpenFileDialog.InitialDirectory = (Get-Location).Path +$OpenFileDialog.Filter = "Text files (*.txt)|*.txt|All files (*.*)|*.*" +$OpenFileDialog.Title = "Select the hashes file" + +# Show the dialog and get the selected file +$OpenFileDialog.ShowDialog() | Out-Null +$selectedFilePath = $OpenFileDialog.FileName + +# Check if a file was selected +if (-not [string]::IsNullOrEmpty($selectedFilePath)) { + # Initialize an array to store the output + $outputArray = @() + + # Read the content of the selected file and process each line + Get-Content $selectedFilePath | ForEach-Object { + # Split each line into username and hash + $username = $_.split(":")[0] + $hash = $_.split(":")[1] + + # Check if the username does not end with a dollar sign - a computer account + if ($username -notmatch '\$$') { + if ($username -notmatch "[^a-zA-Z0-9.]") { + $adUser = Get-ADUser -Filter "SamAccountName -eq '$username'" + # Check if the user account is enabled + if ($adUser.Enabled -eq $true) { + $outputArray += "$($username):$($hash)" + } + } + } + } + + $outputArray | Out-File -FilePath .\filtered_ad_hashes.txt + + Write-Output "Filtering complete. The output has been saved to 'filtered_ad_hashes.txt'." +} else { + Write-Output "No file was selected. Exiting script." +} diff --git a/scripts/Filter-ADUsers/README.md b/scripts/Filter-ADUsers/README.md new file mode 100644 index 0000000..764d946 --- /dev/null +++ b/scripts/Filter-ADUsers/README.md @@ -0,0 +1,8 @@ +# Filter Active Directory output +You can use the script Filter-ADUsers.ps1 to filter the following out from the IFM export from Active Directory: +- Disabled accounts +- Computer accounts + +This saves you from processing accounts that aren’t useful. + +**Note**: You will need to have Remote Server Administrative Tools (RSAT) added from optional features in Windows to use the `ActiveDirectory` PowerShell module. \ No newline at end of file diff --git a/src/lil_pwny/__init__.py b/src/lil_pwny/__init__.py index e963b2f..12261af 100644 --- a/src/lil_pwny/__init__.py +++ b/src/lil_pwny/__init__.py @@ -4,30 +4,30 @@ import tempfile import time import traceback -import warnings from datetime import timedelta from importlib import metadata from lil_pwny import password_audit, hashing -from lil_pwny.loggers import JSONLogger, StdoutLogger +from lil_pwny.custom_password_enhancer import CustomPasswordEnhancer from lil_pwny.exceptions import FileReadError +from lil_pwny.loggers import JSONLogger, StdoutLogger output_logger = JSONLogger -def init_logger(logging_type: str, debug: bool) -> JSONLogger or StdoutLogger: +def init_logger(logging_type: str, verbose: bool) -> JSONLogger or StdoutLogger: """ Create a logger object. Defaults to stdout if no option is given Args: logging_type: Type of logging to use - debug: Whether to use debug level logging or not + verbose: Whether to use verbose logging or not Returns: JSONLogger or StdoutLogger """ if not logging_type or logging_type == 'stdout': - return StdoutLogger(debug=debug) - return JSONLogger(debug=debug) + return StdoutLogger(debug=verbose) + return JSONLogger(debug=verbose) def get_readable_file_size(file_path: str) -> str: @@ -73,6 +73,12 @@ def main(): '-c', '--custom', help='.txt file containing additional custom passwords to check for', dest='custom') + parser.add_argument( + '-custom-enhance', '--custom-enhance', + help='generate an enhanced custom password list based on the provided custom password list. Must be used' + ' with -c/--custom flag. The enhanced list will stored in memory and not written to disk.' + ' Provide the minimum length of the passwords you want. Default is 8', + dest='custom_enhance') parser.add_argument( '-ad', '--ad-hashes', help='The .txt file containing NTLM hashes from AD users', @@ -96,10 +102,10 @@ def main(): default=False, help='Obfuscate hashes from discovered matches by hashing with a random salt') parser.add_argument( - '--debug', - dest='debug', + '--verbose', + dest='verbose', action='store_true', - help='Turn on debug level logging') + help='Turn on verbose logging') args = parser.parse_args() hibp_file = args.hibp @@ -108,19 +114,20 @@ def main(): duplicates = args.d logging_type = args.logging_type obfuscate = args.obfuscate - debug = args.debug + verbose = args.verbose + custom_enhance = args.custom_enhance hasher = hashing.Hashing() if logging_type == 'file': logging_type = 'stdout' - logger = init_logger(logging_type, debug) + logger = init_logger(logging_type, verbose) logger.log('WARNING', 'File output is no longer supported.' ' Select JSON output and redirect this to file. Defaulting to stdout') else: - logger = init_logger(logging_type, debug) + logger = init_logger(logging_type, verbose) - logger.log('INFO', 'Lil Pwny started execution') + logger.log('SUCCESS', 'Lil Pwny started execution') logger.log('INFO', f'Version: {project_metadata.get("version")}') logger.log('INFO', f'Created by: {project_metadata.get("author")}') logger.log('INFO', 'Loading AD user hashes...') @@ -138,13 +145,13 @@ def main(): # Check HIBP file size try: - logger.log('INFO', f'Size of HIBP file provided {get_readable_file_size(hibp_file)}') + logger.log('SUCCESS', f'Size of HIBP file provided {get_readable_file_size(hibp_file)}') except FileNotFoundError as e: logger.log('CRITICAL', f'HIBP file not found: {e.filename}') sys.exit(1) # Compare AD users against HIBP hashes - logger.log('INFO', f'Comparing {ad_lines} AD users against HIBP compromised passwords...') + logger.log('SUCCESS', f'Comparing {ad_lines} AD users against HIBP compromised passwords...') try: hibp_results = password_audit.search( log_handler=logger, @@ -167,24 +174,66 @@ def main(): custom_count = 0 if custom_passwords: try: - custom_content = hasher.get_hashes(custom_passwords) - with tempfile.NamedTemporaryFile('w', delete=False) as temp_file: - for h in custom_content: - temp_file.write(f'{h}:0\n') - temp_file_path = temp_file.name - - logger.log('INFO', f'Comparing {ad_lines} Active Directory' - f' users against {len(custom_content)} custom password hashes...') - custom_matches = password_audit.search( - log_handler=logger, - hibp_hashes_filepath=temp_file_path, - ad_user_hashes=ad_users, - finding_type='custom', - obfuscated=obfuscate) - custom_count = len(custom_matches) - if logging_type != 'stdout': - for result in custom_matches: - logger.log('NOTIFY', result, notify_type='custom') + logger.log('INFO', 'Loading custom password list...') + with open(custom_passwords, 'r') as f: + custom_passwords = [line.strip() for line in f if line.strip()] + logger.log('SUCCESS', f'Loaded {len(custom_passwords)} custom passwords') + + if custom_enhance: + custom_count = 0 + variants_count = 0 + logger.log('INFO', 'Enhancing custom password list by adding variations...') + custom_client = CustomPasswordEnhancer(min_password_length=int(custom_enhance)) + for custom_pwd in custom_passwords: + logger.log('DEBUG', f'Generating variants for `{custom_pwd}`...') + temp_custom_passwords = custom_client.enhance_password(custom_pwd) + logger.log('DEBUG', 'Converting custom passwords to NTLM hashes...') + custom_password_hashes = hasher.get_hashes(temp_custom_passwords) + variants_count += len(custom_password_hashes) + logger.log('SUCCESS', f'Generated {len(custom_password_hashes)} variants for `{custom_pwd}`') + with tempfile.NamedTemporaryFile('w', delete=False) as temp_file: + for h in custom_password_hashes: + temp_file.write(f'{h}\n') + temp_file_path = temp_file.name + logger.log('DEBUG', f'Custom hashes written to temp file {temp_file_path}') + + logger.log('INFO', f'Comparing {ad_lines} Active Directory' + f' users against {len(custom_password_hashes)} custom password hashes...') + custom_matches = password_audit.search( + log_handler=logger, + hibp_hashes_filepath=temp_file_path, + ad_user_hashes=ad_users, + finding_type='custom', + obfuscated=obfuscate) + os.remove(temp_file_path) + logger.log('DEBUG', f'Temp file {temp_file_path} deleted') + custom_count += len(custom_matches) + if logging_type != 'stdout': + for result in custom_matches: + logger.log('NOTIFY', result, notify_type='custom') + else: + logger.log('DEBUG', 'Converting custom passwords to NTLM hashes...') + custom_password_hashes = hasher.get_hashes(custom_passwords) + with tempfile.NamedTemporaryFile('w', delete=False) as temp_file: + for h in custom_password_hashes: + temp_file.write(f'{h}\n') + temp_file_path = temp_file.name + logger.log('DEBUG', f'Custom hashes written to temp file {temp_file_path}') + + logger.log('INFO', f'Comparing {ad_lines} Active Directory' + f' users against {len(custom_password_hashes)} custom password hashes...') + custom_matches = password_audit.search( + log_handler=logger, + hibp_hashes_filepath=temp_file_path, + ad_user_hashes=ad_users, + finding_type='custom', + obfuscated=obfuscate) + os.remove(temp_file_path) + logger.log('DEBUG', f'Temp file {temp_file_path} deleted') + custom_count = len(custom_matches) + if logging_type != 'stdout': + for result in custom_matches: + logger.log('NOTIFY', result, notify_type='custom') except FileNotFoundError as e: logger.log('CRITICAL', f'Custom password file not found: {e.filename}') sys.exit(1) @@ -211,12 +260,15 @@ def main(): time_taken = time.time() - start total_comp_count = custom_count + hibp_count - logger.log('INFO', 'Audit completed') - logger.log('INFO', f'Total compromised passwords: {total_comp_count}') - logger.log('INFO', f'Passwords matching HIBP: {hibp_count}') - logger.log('INFO', f'Passwords matching custom password dictionary: {custom_count}') - logger.log('INFO', f'Passwords duplicated (being used by multiple user accounts): {duplicate_count}') - logger.log('INFO', f'Time taken: {str(timedelta(seconds=time_taken))}') + logger.log('SUCCESS', 'Audit completed') + logger.log('SUCCESS', f'Total compromised passwords: {total_comp_count}') + logger.log('SUCCESS', f'Passwords matching HIBP: {hibp_count}') + logger.log('SUCCESS', f'Passwords matching custom password dictionary: {custom_count}') + if custom_enhance: + logger.log('SUCCESS', f'Variant passwords generated from {len(custom_passwords)} custom passwords:' + f' {variants_count}') + logger.log('SUCCESS', f'Passwords duplicated (being used by multiple user accounts): {duplicate_count}') + logger.log('SUCCESS', f'Time taken: {str(timedelta(seconds=time_taken))}') except Exception as e: logger.log('CRITICAL', str(e)) diff --git a/src/lil_pwny/custom_password_enhancer.py b/src/lil_pwny/custom_password_enhancer.py new file mode 100644 index 0000000..8084f6b --- /dev/null +++ b/src/lil_pwny/custom_password_enhancer.py @@ -0,0 +1,111 @@ +from typing import List +from datetime import datetime + + +class CustomPasswordEnhancer: + """ Enhances the custom password with additional variations + """ + + def __init__(self, min_password_length: int = 8): + self.min_password_length = min_password_length + + def _deduplicate(self, password_list: List) -> List: + """ Remove duplicates from the given list + """ + + return list(set(password_list)) + + def _remove_too_short(self, password_list: List) -> List: + """ Remove passwords that do not match the length requirements + """ + + return [password for password in password_list if len(password) >= self.min_password_length] + + def _add_leet_speak(self, password: str) -> List[str]: + """ Add leetspeak variations to a single password""" + + leet_speak_mappings = { + 'a': ['4', '@'], + 'b': ['8'], + 'e': ['3'], + 'g': ['6'], + 'i': ['1', '!'], + 'l': ['1'], + 'o': ['0'], + 's': ['5', '$'], + 't': ['7'], + 'z': ['2'], + } + + def _generate_variations(word: str, index: int = 0) -> List[str]: + if index == len(word): + return [word] + current_char = word[index] + variations = _generate_variations(word, index + 1) + if current_char.lower() in leet_speak_mappings: + additional_variations = [] + for leet_char in leet_speak_mappings[current_char.lower()]: + for variation in variations: + additional_variations.append(variation[:index] + leet_char + variation[index + 1:]) + variations.extend(additional_variations) + return variations + + return _generate_variations(password) + + def _capitalise_first_character(self, password_list: List) -> List: + """ Capitalise the first letter of each password in the list + """ + + return [password.capitalize() for password in password_list] + + def _pad_password(self, password_list: List) -> List: + """ Pad the password with the original word and additional characters to meet the minimum password length + Characters include alphanumeric characters and special characters + """ + + output_list = [] + for password in password_list: + if len(password) < self.min_password_length: + repeated_password = (password * ((self.min_password_length // len(password)) + 1))[ + :self.min_password_length] + padding_length = self.min_password_length - len(password) + lowercase_padding = ''.join(chr(97 + i % 26) for i in range(padding_length)) + uppercase_padding = ''.join(chr(65 + i % 26) for i in range(padding_length)) + numeric_padding = ''.join(chr(49 + i % 10) for i in range(padding_length)) + output_list.append(repeated_password) + output_list.append(password + lowercase_padding) + output_list.append(password + uppercase_padding) + output_list.append(password + numeric_padding) + return output_list + + def _append_years(self, password_list: List) -> List: + """ Append years from 1950 to ten years greater than the current year to each password in the list + """ + + current_year = datetime.now().year + end_year = current_year + years = [str(year) for year in range(1950, end_year + 1)] + return [password + year for password in password_list for year in years] + + def _append_special_characters(self, password_list: List) -> List: + """ Append special characters commonly used in passwords to the end of each password in the list + """ + + special_characters = ['!', '@', '#', '$', '%', '&', '*', '?'] + return [password + char for password in password_list for char in special_characters] + + def enhance_password(self, password: str) -> List: + """ Enhance a plaintext password list with additional variations + + Args: + password: The custom password to enhance + Returns: + Enhanced list of passwords + """ + + enhanced_list = self._add_leet_speak(password) + enhanced_list += self._capitalise_first_character(enhanced_list) + enhanced_list += self._pad_password(enhanced_list) + enhanced_list += self._append_years(enhanced_list) + enhanced_list += self._append_special_characters(enhanced_list) + return self._deduplicate(self._remove_too_short(enhanced_list)) diff --git a/src/lil_pwny/exceptions.py b/src/lil_pwny/exceptions.py index 6d655a7..da15bbe 100644 --- a/src/lil_pwny/exceptions.py +++ b/src/lil_pwny/exceptions.py @@ -9,12 +9,12 @@ def __init__(self, message): class HashingError(Exception): - """Base class for exceptions in this module.""" + """ Base class for exceptions in this module.""" pass class FileReadError(HashingError): - """Exception raised for errors in the input file. + """ Exception raised for errors in the input file. Attributes: filename: The name of the input file which caused the error. diff --git a/src/lil_pwny/hashing.py b/src/lil_pwny/hashing.py index 6c79cb9..0ed76d5 100644 --- a/src/lil_pwny/hashing.py +++ b/src/lil_pwny/hashing.py @@ -1,11 +1,11 @@ import binascii import hashlib import secrets -from typing import Dict +from typing import List +from multiprocessing import Pool, cpu_count -from Crypto.Hash import MD4 -from lil_pwny.exceptions import FileReadError +from Crypto.Hash import MD4 class Hashing(object): @@ -17,41 +17,35 @@ def __init__(self): @staticmethod def _hashify(input_string: str) -> str: - """Converts the input string to a NTLM hash and returns the hash + """ Converts the input string to a NTLM hash and returns the hash Args: input_string: string to be converted to NTLM hash Returns: Converted NTLM hash """ - hasher = MD4.new() hasher.update(input_string.encode('utf-16le')) output = hasher.digest() return binascii.hexlify(output).decode('utf-8').upper() - def get_hashes(self, input_file: str) -> Dict[str, str]: - """Reads the input file of passwords, converts them to NTLM hashes + def _process_password(self, password: str) -> str: + return f'{self._hashify(password)}:0:{password}' + + def get_hashes(self, password_list: List[str]) -> List[str]: + """ Converts a list of strings to NTLM hashes using multiprocessing Args: - input_file: file containing strings to convert to NTLM hashes + password_list: list of strings to convert to NTLM hashes Returns: - Dict that replicates HIBP format: 'hash:occurrence_count' + List of NTLM hashes of the passwords """ - - output_dict = {} - try: - with open(input_file, 'r') as f: - for item in f: - if item: - output_dict[self._hashify(item.strip())] = '0' - except IOError as e: - raise FileReadError(input_file, str(e)) - - return output_dict + with Pool(cpu_count()) as pool: + hashes = pool.map(self._process_password, password_list) + return hashes def obfuscate(self, input_hash: str) -> str: - """Further hashes the input NTLM hash with a random salt + """ Further hashes the input NTLM hash with a random salt Args: input_hash: hash to be obfuscated diff --git a/src/lil_pwny/loggers.py b/src/lil_pwny/loggers.py index afff6e7..2db93dc 100644 --- a/src/lil_pwny/loggers.py +++ b/src/lil_pwny/loggers.py @@ -40,7 +40,7 @@ def log(self, mes_type = 'HIBP' if notify_type == "custom": message = f'CUSTOM_MATCH: \n' \ - f' ACCOUNT: {message.get("username").lower()} HASH: {message.get("hash")} OBFUSCATED: {message.get("obfuscated")}' + f' ACCOUNT: {message.get("username").lower()} HASH: {message.get("hash")} PASSWORD: {message.get("plaintext_password")} OBFUSCATED: {message.get("obfuscated")}' mes_type = 'CUSTOM' if notify_type == "duplicate": message = 'DUPLICATE: \n' \ @@ -98,6 +98,12 @@ def log_to_stdout(self, key_color = Fore.YELLOW style = Style.NORMAL mes_type = '!' + elif mes_type == "SUCCESS": + base_color = Fore.LIGHTGREEN_EX + high_color = Fore.LIGHTGREEN_EX + key_color = Fore.LIGHTGREEN_EX + style = Style.NORMAL + mes_type = '>>' elif mes_type == "DUPLICATE": base_color = Fore.LIGHTGREEN_EX high_color = Fore.LIGHTGREEN_EX diff --git a/src/lil_pwny/password_audit.py b/src/lil_pwny/password_audit.py index 8ab1f3e..ad0d6d9 100644 --- a/src/lil_pwny/password_audit.py +++ b/src/lil_pwny/password_audit.py @@ -52,7 +52,7 @@ def import_users(filepath: str) -> Dict[str, List[str]]: def find_duplicates(ad_hash_dict: Dict, obfuscated: bool) -> List[dict]: - """Returns users using the same hash in the input file. Outputs + """ Returns users using the same hash in the input file. Outputs a file grouping all users of a hash being used more than once Args: @@ -130,7 +130,7 @@ def search(log_handler: JSONLogger or StdoutLogger, def _nonblank_lines(f: TextIO) -> str: - """Generator to filter out blank lines from the input list + """ Generator to filter out blank lines from the input list Args: f: input file @@ -215,7 +215,7 @@ def _multi_pro_search(log_handler: JSONLogger or StdoutLogger, worker_function: callable, worker_function_args: List, skip_lines: int = 0) -> List[dict]: - """Breaks the [HIBP|custom passwords] file into blocks and uses multiprocessing to iterate through them and return + """ Breaks the [HIBP|custom passwords] file into blocks and uses multiprocessing to iterate through them and return any matches against AD users. Args: @@ -239,8 +239,8 @@ def _multi_pro_search(log_handler: JSONLogger or StdoutLogger, jobs = _divide_blocks(hibp_filepath, 1024 * 1024 * block_size, skip_lines) jobs = [list(j) + [worker_function, encoding] + worker_function_args for j in jobs] - log_handler.log('INFO', f'Split into {len(jobs)} parallel jobs ') - log_handler.log('INFO', f'{cores} cores being utilised') + log_handler.log('DEBUG', f'Split into {len(jobs)} parallel jobs ') + log_handler.log('DEBUG', f'{cores} cores being utilised') pool = mp.Pool(cores - 1, maxtasksperchild=1000) @@ -281,7 +281,13 @@ def _worker(line: str, """ try: - ntlm_hash, count = line.rstrip().split(':')[0].upper(), line.rstrip().split(':')[1].strip().upper() + ntlm_hash, count, plaintext_password = (line.rstrip().split(':')[0].upper(), + line.rstrip().split(':')[1].strip().upper(), + line.rstrip().split(':')[2].strip()) + except IndexError: + ntlm_hash, count, plaintext_password = (line.rstrip().split(':')[0].upper(), + line.rstrip().split(':')[1].strip().upper(), + 'REDACTED') except Exception as e: if logger: logger.log('ERROR', f'Failed to parse line: {line}. Error: {str(e)}') @@ -291,11 +297,13 @@ def _worker(line: str, return_hash = ntlm_hash if obfuscated: return_hash = hash_client.obfuscate(ntlm_hash) + plaintext_password = 'REDACTED' for u in user_list.get(ntlm_hash): finding = { 'username': u, 'hash': return_hash, 'matches_in_hibp': count, + 'plaintext_password': plaintext_password, 'obfuscated': obfuscated } if isinstance(logger, StdoutLogger):