From ca51a50c427e8bbd9624bb3031d53e2f41066cab Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Thu, 25 Dec 2025 13:56:18 +0530 Subject: [PATCH 01/30] PoC: initial commit --- user_scanner/email_scan/dev/github.py | 46 +++++++++++++++++++++ user_scanner/email_scan/social/instagram.py | 40 ++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 user_scanner/email_scan/dev/github.py create mode 100644 user_scanner/email_scan/social/instagram.py diff --git a/user_scanner/email_scan/dev/github.py b/user_scanner/email_scan/dev/github.py new file mode 100644 index 0000000..0902236 --- /dev/null +++ b/user_scanner/email_scan/dev/github.py @@ -0,0 +1,46 @@ +import asyncio +import httpx +import re + +async def check_email(email): + headers = { + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", + "referer": "https://github.com/signup" + } + + async with httpx.AsyncClient(headers=headers, http2=True, follow_redirects=True) as client: + res = await client.get("https://github.com/signup") + + token_match = re.search(r'name="authenticity_token" value="([^"]+)"', res.text) + if not token_match: + return "Token not found" + + token = token_match.group(1) + + data = { + "authenticity_token": token, + "value": email + } + + post_headers = {"accept": "*/*", "x-requested-with": "XMLHttpRequest"} + response = await client.post("https://github.com/email_validity_checks", data=data, headers=post_headers) + + if not "The email you have provided is already associated with an account." in response.text: + return f"[+] {email} is not REGISTERED" + else: + return f"[-] {email} is Registerd" + +async def main(): + email_to_test = input("Enter the email: ") + result = await check_email(email_to_test) + print(result) + +if __name__ == "__main__": + asyncio.run(main()) + + + + + + diff --git a/user_scanner/email_scan/social/instagram.py b/user_scanner/email_scan/social/instagram.py new file mode 100644 index 0000000..53efc94 --- /dev/null +++ b/user_scanner/email_scan/social/instagram.py @@ -0,0 +1,40 @@ +import asyncio +import httpx + +async def check_instagram(email): + ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36" + async with httpx.AsyncClient(headers={"user-agent": ua}, http2=True) as client: + await client.get("https://www.instagram.com/") + csrf = client.cookies.get("csrftoken") + + headers = { + "x-csrftoken": csrf, + "x-ig-app-id": "936619743392459", + "x-requested-with": "XMLHttpRequest", + "referer": "https://www.instagram.com/accounts/password/reset/" + } + + resp = await client.post( + "https://www.instagram.com/api/v1/web/accounts/account_recovery_send_ajax/", + data={"email_or_username": email}, + headers=headers + ) + + data = resp.json() + if data.get("status") == "ok": + return f"[!] {email} -> USED (Account exists)" + return f"[*] {email} -> NOT USED" + +if __name__ == "__main__": + target = input("Enter the Email: ") + print(asyncio.run(check_instagram(target))) + + + + + + + + + + From f0a7c34c9b91fb532b9c60fb695520a881f71762 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Thu, 25 Dec 2025 14:02:58 +0530 Subject: [PATCH 02/30] fix: ruff fixes --- user_scanner/email_scan/dev/github.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/user_scanner/email_scan/dev/github.py b/user_scanner/email_scan/dev/github.py index 0902236..5c37eaa 100644 --- a/user_scanner/email_scan/dev/github.py +++ b/user_scanner/email_scan/dev/github.py @@ -26,7 +26,7 @@ async def check_email(email): post_headers = {"accept": "*/*", "x-requested-with": "XMLHttpRequest"} response = await client.post("https://github.com/email_validity_checks", data=data, headers=post_headers) - if not "The email you have provided is already associated with an account." in response.text: + if "The email you have provided is already associated with an account." not in response.text: return f"[+] {email} is not REGISTERED" else: return f"[-] {email} is Registerd" From e04cbdee532b53c119e90779f13e647fe7b3cc59 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Thu, 25 Dec 2025 14:51:01 +0530 Subject: [PATCH 03/30] add: Poc: email availability check for X.com --- user_scanner/email_scan/social/x.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 user_scanner/email_scan/social/x.py diff --git a/user_scanner/email_scan/social/x.py b/user_scanner/email_scan/social/x.py new file mode 100644 index 0000000..f095e43 --- /dev/null +++ b/user_scanner/email_scan/social/x.py @@ -0,0 +1,26 @@ +import asyncio +import httpx + +async def check_x(email): + url = f"https://api.x.com/i/users/email_available.json?email={email}" + headers = { + "user-agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Mobile Safari/537.36", + "x-twitter-client-language": "en", + "x-twitter-active-user": "yes", + "referer": "https://x.com/" + } + + async with httpx.AsyncClient(http2=True) as client: + try: + resp = await client.get(url, headers=headers) + data = resp.json() + if data.get("taken"): + return f"[!] {email} -> REGISTERED" + return f"[*] {email} -> NOT REGISTERED" + except Exception as e: + return f"Error checking {email}: {e}" + +if __name__ == "__main__": + target_email = input("Enter Email: ").strip() + result = asyncio.run(check_x(target_email)) + print(result) From 71da2f891e9d7d3792534d7bb85bf645c462badb Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Fri, 26 Dec 2025 20:12:47 +0000 Subject: [PATCH 04/30] start: adding support for email --- tests/test_orchestrator.py | 10 ++++++++ user_scanner/__main__.py | 28 ++++++++++++--------- user_scanner/core/orchestrator.py | 42 ++++++++++++++++++------------- 3 files changed, 51 insertions(+), 29 deletions(-) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index c04afe2..b012f9b 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -25,6 +25,16 @@ def test_generate_permutations(): assert len(perms) > 1 +def test_generate_permutations_email(): + perms = orchestrator.generate_permutations("john@email.com", "abc", limit=None, is_email=True) + assert "john@email.com" in perms + assert all( + p == "john@email.com" or + (p.startswith("john") and len(p) > len("john@email.com") and p.endswith("@email.com")) + for p in perms + ) + assert len(perms) > 1 + def test_run_module_single_prints_json_and_csv(capsys): module = types.ModuleType("fake.testsite") module.__file__ = "/fake/testsite.py" diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index 4464808..ed3ade3 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -1,6 +1,7 @@ import argparse import time import sys +import re from user_scanner.cli import printer from user_scanner.core.orchestrator import generate_permutations, load_categories from colorama import Fore, Style @@ -28,9 +29,15 @@ def main(): prog="user-scanner", description="Scan usernames across multiple platforms." ) - parser.add_argument( + + group = parser.add_mutually_exclusive_group(required=True) + + group.add_argument( "-u", "--username", help="Username to scan across platforms" ) + group.add_argument( + "-e", "--email", help="Email to scan across platforms" + ) parser.add_argument( "-c", "--category", choices=load_categories().keys(), help="Scan all platforms in a category" @@ -44,22 +51,18 @@ def main(): parser.add_argument( "-v", "--verbose", action="store_true", help="Enable verbose output" ) - parser.add_argument( "-p", "--permute",type=str,help="Generate username permutations using a string pattern (e.g -p 234)" ) parser.add_argument( "-s", "--stop",type=int,default=MAX_PERMUTATIONS_LIMIT,help="Limit the number of username permutations generated" ) - parser.add_argument( "-d", "--delay",type=float,default=0,help="Delay in seconds between requests (recommended: 1-2 seconds)" ) - parser.add_argument( "-f", "--format", choices=["console", "csv", "json"], default="console", help="Specify the output format (default: console)" ) - parser.add_argument( "-o", "--output", type=str, help="Specify the output file" ) @@ -82,14 +85,14 @@ def main(): check_for_updates() - if not args.username: - parser.print_help() - return - - if Printer.is_console: print_banner() + is_email = args.email is not None + if is_email and not re.findall(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", args.email): + print(R + "[✘] Error: Invalid email." + X) + sys.exit(1) + if args.permute and args.delay == 0 and Printer.is_console: print( Y @@ -97,11 +100,12 @@ def main(): "This may trigger rate limits or IP bans. Use --delay 1 or higher. (Use only if the sites throw errors otherwise ignore)\n" + Style.RESET_ALL) - usernames = [args.username] # Default single username list + name = args.username or args.email #Username or email + usernames = [name] # Default single username list # Added permutation support , generate all possible permutation of given sequence. if args.permute: - usernames = generate_permutations(args.username, args.permute , args.stop) + usernames = generate_permutations(name, args.permute , args.stop, is_email) if Printer.is_console: print( C + f"[+] Generated {len(usernames)} username permutations" + Style.RESET_ALL) diff --git a/user_scanner/core/orchestrator.py b/user_scanner/core/orchestrator.py index 04dd4c0..1cda1a1 100644 --- a/user_scanner/core/orchestrator.py +++ b/user_scanner/core/orchestrator.py @@ -7,11 +7,12 @@ from pathlib import Path from user_scanner.cli.printer import Printer from user_scanner.core.result import Result +from types import ModuleType from typing import Callable, Dict, List from user_scanner.core.helpers import get_site_name, is_last_value -def load_modules(category_path: Path): +def load_modules(category_path: Path) -> List[ModuleType]: modules = [] for file in category_path.glob("*.py"): if file.name == "__init__.py": @@ -26,8 +27,9 @@ def load_modules(category_path: Path): return modules -def load_categories() -> Dict[str, Path]: - root = Path(__file__).resolve().parent.parent / "user_scan" +def load_categories(is_email: bool = False) -> Dict[str, Path]: + folder_name = "email_scan" if is_email else "user_scan" + root = Path(__file__).resolve().parent.parent / folder_name categories = {} for subfolder in root.iterdir(): @@ -39,33 +41,30 @@ def load_categories() -> Dict[str, Path]: return categories -def find_module(name: str): +def find_module(name: str, is_email: bool = False) -> List[ModuleType]: name = name.lower() - matches = [ + return [ module - for category_path in load_categories().values() + for category_path in load_categories(is_email).values() for module in load_modules(category_path) if module.__name__.split(".")[-1].lower() == name ] - return matches -def find_category(module) -> str | None: +def find_category(module: ModuleType) -> str | None: module_file = getattr(module, '__file__', None) if not module_file: return None category = Path(module_file).parent.name.lower() - categories = load_categories() - if category in categories: + if category in load_categories(False) or category in load_categories(True): return category.capitalize() return None - def worker_single(module, username: str) -> Result: func = next((getattr(module, f) for f in dir(module) if f.startswith("validate_") and callable(getattr(module, f))), None) @@ -99,7 +98,6 @@ def run_module_single(module, username: str, printer: Printer, last: bool = True return [result] - def run_checks_category(category_path: Path, username: str, printer: Printer, last: bool = True) -> List[Result]: modules = load_modules(category_path) @@ -196,20 +194,30 @@ def contains(a, b): return (isinstance(a, list) and b in a) or (a == b) return generic_validate(url, inner, **kwargs) -def generate_permutations(username, pattern, limit=None): +def generate_permutations(username: str, pattern: str, limit: int | None = None, is_email: bool = False) -> List[str]: """ Generate all order-based permutations of characters in `pattern` appended after `username`. """ - permutations_set = {username} + if limit and limit <= 0: + return [] + + permutations_set = {username} chars = list(pattern) + domain = "" + if is_email: + username, domain = username.strip().split("@") + # generate permutations of length 1 → len(chars) - for r in range(1, len(chars) + 1): + for r in range(len(chars)): for combo in permutations(chars, r): - permutations_set.add(username + ''.join(combo)) + new = username + ''.join(combo) + if is_email: + new += "@" + domain + permutations_set.add(new) if limit and len(permutations_set) >= limit: - return list(permutations_set)[:limit] + return sorted(permutations_set) return sorted(permutations_set) From e705310ceff1b0de56e7f825228c5bcf9b04592d Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Sat, 27 Dec 2025 15:44:01 +0530 Subject: [PATCH 05/30] restore: print help-menu and add: version flag --- user_scanner/__main__.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index ed3ade3..49b8149 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -8,6 +8,7 @@ from user_scanner.cli.banner import print_banner from typing import List from user_scanner.core.result import Result +from user_scanner.core.version import load_local_version from user_scanner.core.helpers import is_last_value from user_scanner.utils.updater_logic import check_for_updates from user_scanner.utils.update import update_self @@ -30,7 +31,7 @@ def main(): description="Scan usernames across multiple platforms." ) - group = parser.add_mutually_exclusive_group(required=True) + group = parser.add_mutually_exclusive_group(required=False) group.add_argument( "-u", "--username", help="Username to scan across platforms" @@ -69,7 +70,9 @@ def main(): parser.add_argument( "-U", "--update", action="store_true", help="Update user-scanner to latest version" ) - + parser.add_argument( + "--version", action="store_true", help="Print the current pypi version of the tool" + ) args = parser.parse_args() Printer = printer.Printer(args.format) @@ -83,8 +86,16 @@ def main(): Printer.print_modules(args.category) return + if args.version: + version, _ = load_local_version() + print(f"user-scanner current version -> {G}{version}{X}") + sys.exit(0) check_for_updates() + if not (args.username or args.email): + parser.print_help() + return + if Printer.is_console: print_banner() From 19d8d172850cba163a6ab576a6943bbc7b18fc4b Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Sat, 27 Dec 2025 22:03:22 +0000 Subject: [PATCH 06/30] rm: printer; added: formatter --- tests/test_formatter.py | 35 +++++++++ tests/test_orchestrator.py | 14 +--- tests/test_printer.py | 64 ---------------- user_scanner/__main__.py | 76 +++++++++---------- user_scanner/cli/printer.py | 117 ------------------------------ user_scanner/core/formatter.py | 27 +++++++ user_scanner/core/orchestrator.py | 38 ++++------ user_scanner/core/result.py | 16 ++++ 8 files changed, 130 insertions(+), 257 deletions(-) create mode 100644 tests/test_formatter.py delete mode 100644 tests/test_printer.py delete mode 100644 user_scanner/cli/printer.py create mode 100644 user_scanner/core/formatter.py diff --git a/tests/test_formatter.py b/tests/test_formatter.py new file mode 100644 index 0000000..c023eee --- /dev/null +++ b/tests/test_formatter.py @@ -0,0 +1,35 @@ +from user_scanner.core.formatter import into_csv, into_json, indentate, INDENT +from user_scanner.core.result import Result + + +def test_indentate(): + assert indentate("", -1) == "" + assert indentate("", 0) == "" + assert indentate("", 2) == 2 * INDENT + + msg = ("This is a test message\n" + "made to test the identation\n" + "and shouldn't be changed.") + + for i in range(0, 4): + new = indentate(msg, i) + for line in new.split("\n"): + assert line.find(INDENT * i) == 0 + + +def test_get_result_output_formats(): + res = Result.available( + username="alice", site_name="ExampleSite", category="Cat") + + out_console = res.get_console_output() + assert "Available" in out_console + assert "ExampleSite" in out_console + assert "alice" in out_console + + out_json = into_json([res]) + assert '"username": "alice"' in out_json + assert '"site_name": "ExampleSite"' in out_json + + out_csv = into_csv([res]) + assert "alice" in out_csv + assert "ExampleSite" in out_csv diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index b012f9b..15e6dd3 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -1,7 +1,6 @@ import types from types import SimpleNamespace from user_scanner.core import orchestrator -from user_scanner.cli.printer import Printer from user_scanner.core.result import Result @@ -44,16 +43,10 @@ def validate_testsite(username): setattr(module, "validate_testsite", validate_testsite) - p_json = Printer("json") - orchestrator.run_module_single(module, "bob", p_json, last=True) + orchestrator.run_module_single(module, "bob") out = capsys.readouterr().out - assert '"username": "bob"' in out + assert 'bob' in out #Needs to be improved - p_csv = Printer("csv") - orchestrator.run_module_single(module, "bob", p_csv, last=True) - out2 = capsys.readouterr().out - assert "bob" in out2 - assert "Testsite" in out2 or "testsite" in out2 def test_run_checks_category_threaded(monkeypatch, tmp_path): @@ -70,8 +63,7 @@ def validate(username): monkeypatch.setattr(orchestrator, "load_modules", lambda p: [module]) monkeypatch.setattr(orchestrator, "get_site_name", lambda m: "Testsite") - p = Printer("console") - results = orchestrator.run_checks_category(tmp_path, "someone", p, last=True) + results = orchestrator.run_checks_category(tmp_path, "someone") assert isinstance(results, list) assert len(results) == 1 assert results[0].to_number() == 0 # TAKEN diff --git a/tests/test_printer.py b/tests/test_printer.py deleted file mode 100644 index ad398e7..0000000 --- a/tests/test_printer.py +++ /dev/null @@ -1,64 +0,0 @@ -from user_scanner.cli.printer import Printer, indentate, INDENT -from user_scanner.core.result import Result -import pytest - - -def test_indentate(): - assert indentate("", -1) == "" - assert indentate("", 0) == "" - assert indentate("", 2) == 2 * INDENT - - msg = ("This is a test message\n" - "made to test the identation\n" - "and shouldn't be changed.") - - for i in range(0, 4): - new = indentate(msg, i) - for line in new.split("\n"): - assert line.find(INDENT * i) == 0 - - -def test_creation(): - assert Printer("console").mode == "console" - assert Printer("csv").mode == "csv" - assert Printer("json").mode == "json" - - with pytest.raises(ValueError): - Printer(2) - - -def test_is_properties(): - p = Printer("console") - assert p.is_console is True - assert p.is_csv is False - assert p.is_json is False - - p = Printer("csv") - assert p.is_console is False - assert p.is_csv is True - assert p.is_json is False - - p = Printer("json") - assert p.is_console is False - assert p.is_csv is False - assert p.is_json is True - - -def test_get_result_output_formats(): - res = Result.available(username="alice", site_name="ExampleSite", category="Cat") - - p_console = Printer("console") - out_console = p_console.get_result_output(res) - assert "Available" in out_console - assert "ExampleSite" in out_console - assert "alice" in out_console - - p_json = Printer("json") - out_json = p_json.get_result_output(res) - assert '"username": "alice"' in out_json - assert '"site_name": "ExampleSite"' in out_json - - p_csv = Printer("csv") - out_csv = p_csv.get_result_output(res) - assert "alice" in out_csv - assert "ExampleSite" in out_csv diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index 49b8149..00669f7 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -2,14 +2,14 @@ import time import sys import re -from user_scanner.cli import printer -from user_scanner.core.orchestrator import generate_permutations, load_categories +from user_scanner.core.orchestrator import generate_permutations, load_categories, load_modules from colorama import Fore, Style from user_scanner.cli.banner import print_banner from typing import List from user_scanner.core.result import Result from user_scanner.core.version import load_local_version from user_scanner.core.helpers import is_last_value +from user_scanner.core import formatter from user_scanner.utils.updater_logic import check_for_updates from user_scanner.utils.update import update_self @@ -21,7 +21,7 @@ X = Fore.RESET -MAX_PERMUTATIONS_LIMIT = 100 # To prevent excessive generation +MAX_PERMUTATIONS_LIMIT = 100 # To prevent excessive generation def main(): @@ -53,13 +53,13 @@ def main(): "-v", "--verbose", action="store_true", help="Enable verbose output" ) parser.add_argument( - "-p", "--permute",type=str,help="Generate username permutations using a string pattern (e.g -p 234)" + "-p", "--permute", type=str, help="Generate username permutations using a string pattern (e.g -p 234)" ) parser.add_argument( - "-s", "--stop",type=int,default=MAX_PERMUTATIONS_LIMIT,help="Limit the number of username permutations generated" + "-s", "--stop", type=int, default=MAX_PERMUTATIONS_LIMIT, help="Limit the number of username permutations generated" ) parser.add_argument( - "-d", "--delay",type=float,default=0,help="Delay in seconds between requests (recommended: 1-2 seconds)" + "-d", "--delay", type=float, default=0, help="Delay in seconds between requests (recommended: 1-2 seconds)" ) parser.add_argument( "-f", "--format", choices=["console", "csv", "json"], default="console", help="Specify the output format (default: console)" @@ -75,15 +75,21 @@ def main(): ) args = parser.parse_args() - Printer = printer.Printer(args.format) - if args.update is True: update_self() print(f"[{G}+{X}] {G}Update successful. Please restart the tool.{X}") sys.exit(0) if args.list: - Printer.print_modules(args.category) + categories = load_categories() + for cat_name, cat_path in categories.items(): + modules = load_modules(cat_path) + print(Fore.MAGENTA + + f"\n== {cat_name.upper()} SITES =={Style.RESET_ALL}") + for module in modules: + site_name = module.__name__.split(".")[-1].capitalize() + print(f" - {site_name}") + return if args.version: @@ -93,57 +99,53 @@ def main(): check_for_updates() if not (args.username or args.email): - parser.print_help() - return + parser.print_help() + return - if Printer.is_console: - print_banner() + print_banner() is_email = args.email is not None if is_email and not re.findall(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", args.email): print(R + "[✘] Error: Invalid email." + X) sys.exit(1) - if args.permute and args.delay == 0 and Printer.is_console: + if args.permute and args.delay == 0: print( - Y - + "[!] Warning: You're generating multiple usernames with NO delay between requests. " - "This may trigger rate limits or IP bans. Use --delay 1 or higher. (Use only if the sites throw errors otherwise ignore)\n" - + Style.RESET_ALL) + Y + + "[!] Warning: You're generating multiple usernames with NO delay between requests. " + "This may trigger rate limits or IP bans. Use --delay 1 or higher. (Use only if the sites throw errors otherwise ignore)\n" + + Style.RESET_ALL) - name = args.username or args.email #Username or email + name = args.username or args.email # Username or email usernames = [name] # Default single username list # Added permutation support , generate all possible permutation of given sequence. if args.permute: - usernames = generate_permutations(name, args.permute , args.stop, is_email) - if Printer.is_console: - print( - C + f"[+] Generated {len(usernames)} username permutations" + Style.RESET_ALL) + usernames = generate_permutations( + name, args.permute, args.stop, is_email) + print( + C + f"[+] Generated {len(usernames)} username permutations" + Style.RESET_ALL) if args.module and "." in args.module: args.module = args.module.replace(".", "_") - def run_all_usernames(func, arg = None) -> List[Result]: + def run_all_usernames(func, arg=None) -> List[Result]: """ Executes a function for all given usernames. Made in order to simplify main() """ results = [] - print(Printer.get_start()) for i, name in enumerate(usernames): - is_last = i == len(usernames) - 1 + is_last = is_last_value(usernames, i) if arg is None: - results.extend(func(name, Printer, is_last)) + results.extend(func(name)) else: - results.extend(func(arg, name, Printer, is_last)) + results.extend(func(arg, name)) if args.delay > 0 and not is_last: time.sleep(args.delay) - if Printer.is_json: - print(Printer.get_end()) return results - results = [] + results = [] if args.module: # Single module search across all categories @@ -171,7 +173,7 @@ def run_all_usernames(func, arg = None) -> List[Result]: if not args.output: return - if args.output and Printer.is_console: + if args.output and args.format == "console": msg = ( "\n[!] The console format cannot be " f"written to file: '{args.output}'." @@ -179,14 +181,8 @@ def run_all_usernames(func, arg = None) -> List[Result]: print(R + msg + Style.RESET_ALL) return - content = Printer.get_start() - - for i,result in enumerate(results): - char = "" if Printer.is_csv or is_last_value(results, i) else "," - content += "\n" + Printer.get_result_output(result) + char - - if Printer.is_json: - content += "\n" + Printer.get_end() + + content = formatter.into_csv(results) if args.format == "csv" else formatter.into_json(results) with open(args.output, "a", encoding="utf-8") as f: f.write(content) diff --git a/user_scanner/cli/printer.py b/user_scanner/cli/printer.py deleted file mode 100644 index 01f035f..0000000 --- a/user_scanner/cli/printer.py +++ /dev/null @@ -1,117 +0,0 @@ -from colorama import Fore, Style -from typing import Literal -from user_scanner.core.result import Result, Status - -INDENT = " " -CSV_HEADER = "username,category,site_name,status,url,reason" - - -def indentate(msg: str, indent: int): - if indent <= 0: - return msg - tabs = INDENT * indent - return "\n".join([f"{tabs}{line}" for line in msg.split("\n")]) - - -class Printer: - def __init__(self, format: Literal["console", "csv", "json"]) -> None: - if format not in ["console", "csv", "json"]: - raise ValueError(f"Invalid output-format: {format}") - self.mode: str = format - self.indent: int = 0 - - @property - def is_console(self) -> bool: - return self.mode == "console" - - @property - def is_csv(self) -> bool: - return self.mode == "csv" - - @property - def is_json(self) -> bool: - return self.mode == "json" - - def get_start(self, json_char: str = "[") -> str: - if self.is_json: - self.indent += 1 - return indentate(json_char, self.indent - 1) - elif self.is_csv: - return CSV_HEADER - return "" - - def get_end(self, json_char: str = "]") -> str: - if not self.is_json: - return "" - self.indent = max(self.indent - 1, 0) - return indentate(json_char, self.indent) - - def get_result_output(self, result: Result) -> str: - # In principle result should always have this - site_name = result.site_name - username = result.username - - match (result.status, self.mode): - case (Status.AVAILABLE, "console"): - return f"{INDENT}{Fore.GREEN}[✔] {site_name} ({username}): Available{Style.RESET_ALL}" - - case (Status.TAKEN, "console"): - return f"{INDENT}{Fore.RED}[✘] {site_name} ({username}): Taken{Style.RESET_ALL}" - - case (Status.ERROR, "console"): - reason = "" - if isinstance(result, Result) and result.has_reason(): - reason = f" ({result.get_reason()})" - return f"{INDENT}{Fore.YELLOW}[!] {site_name} ({username}): Error{reason}{Style.RESET_ALL}" - - case (_, "json"): - return indentate(result.to_json().replace("\t", INDENT), self.indent) - - case (_, "csv"): - return result.to_csv() - - return "" - - def print_modules(self, category: str | None = None): - from user_scanner.core.orchestrator import load_categories, load_modules - categories = load_categories() - categories_to_list = [category] if category else categories.keys() - - # Print the start - if self.is_json: - print(self.get_start("{")) - elif self.is_csv: - print("category,site_name") - - for i, cat_name in enumerate(categories_to_list): - path = categories[cat_name] - modules = load_modules(path) - - # Print for each category - match self.mode: - case "console": - print(Fore.MAGENTA + - f"\n== {cat_name.upper()} SITES =={Style.RESET_ALL}") - case "json": - print(self.get_start(f"\"{cat_name}\": [")) - - for j, module in enumerate(modules): - is_last = j == len(modules) - 1 - site_name = module.__name__.split(".")[-1].capitalize() - - # Print for each site name - match self.mode: - case "console": - print(f"{INDENT}- {site_name}") - case "json": - msg = f"\"{site_name}\"" + ("" if is_last else ",") - print(indentate(msg, self.indent)) - case "csv": - print(f"{cat_name},{site_name}") - - if self.is_json: - is_last = i == len(categories_to_list) - 1 - print(self.get_end("]" if is_last else "],")) - - if self.is_json: - print(self.get_end("}")) diff --git a/user_scanner/core/formatter.py b/user_scanner/core/formatter.py new file mode 100644 index 0000000..1018861 --- /dev/null +++ b/user_scanner/core/formatter.py @@ -0,0 +1,27 @@ +from user_scanner.core.result import Result +from typing import List + +INDENT = " " +CSV_HEADER = "username,category,site_name,status,reason" + + +def indentate(msg: str, indent: int): + if indent <= 0: + return msg + tabs = INDENT * indent + return "\n".join([f"{tabs}{line}" for line in msg.split("\n")]) + + +def into_json(results: List[Result]) -> str: + res = "[\n" + + for i, result in enumerate(results): + is_last = i == len(results) - 1 + end = "" if is_last else "," + res += indentate(result.to_json().replace("\t", INDENT), 1) + end + "\n" + + return res + "]" + + +def into_csv(results: List[Result]) -> str: + return CSV_HEADER + "\n" + "\n".join(result.to_csv() for result in results) diff --git a/user_scanner/core/orchestrator.py b/user_scanner/core/orchestrator.py index 1cda1a1..9666fe4 100644 --- a/user_scanner/core/orchestrator.py +++ b/user_scanner/core/orchestrator.py @@ -5,11 +5,10 @@ from itertools import permutations import httpx from pathlib import Path -from user_scanner.cli.printer import Printer from user_scanner.core.result import Result from types import ModuleType from typing import Callable, Dict, List -from user_scanner.core.helpers import get_site_name, is_last_value +from user_scanner.core.helpers import get_site_name def load_modules(category_path: Path) -> List[ModuleType]: @@ -82,57 +81,46 @@ def worker_single(module, username: str) -> Result: return Result.error(e, site_name=site_name, username=username) -def run_module_single(module, username: str, printer: Printer, last: bool = True) -> List[Result]: +def run_module_single(module, username: str) -> List[Result]: result = worker_single(module, username) category = find_category(module) if category: result.update(category=category) - get_site_name(module) - msg = printer.get_result_output(result) - if not last and printer.is_json: - msg += "," - print(msg) + print(result.get_console_output()) return [result] -def run_checks_category(category_path: Path, username: str, printer: Printer, last: bool = True) -> List[Result]: +def run_checks_category(category_path: Path, username: str) -> List[Result]: modules = load_modules(category_path) category_name = category_path.stem.capitalize() - if printer.is_console: - print(f"\n{Fore.MAGENTA}== {category_name} SITES =={Style.RESET_ALL}") + print(f"\n{Fore.MAGENTA}== {category_name} SITES =={Style.RESET_ALL}") results = [] with ThreadPoolExecutor(max_workers=10) as executor: exec_map = executor.map(lambda m: worker_single(m, username), modules) - for i, result in enumerate(exec_map): - result.update(category = category_name) + for result in exec_map: + result.update(category=category_name) results.append(result) - is_last = last and is_last_value(modules, i) - get_site_name(modules[i]) - msg = printer.get_result_output(result) - if not is_last and printer.is_json: - msg += "," - print(msg) + print(result.get_console_output()) return results -def run_checks(username: str, printer: Printer, last: bool = True) -> List[Result]: - if printer.is_console: - print(f"\n{Fore.CYAN} Checking username: {username}{Style.RESET_ALL}") +def run_checks(username: str) -> List[Result]: + + print(f"\n{Fore.CYAN} Checking username: {username}{Style.RESET_ALL}") results = [] categories = list(load_categories().values()) - for i, category_path in enumerate(categories): - last_cat = last and (i == len(categories) - 1) - temp = run_checks_category(category_path, username, printer, last_cat) + for category_path in categories: + temp = run_checks_category(category_path, username) results.extend(temp) return results diff --git a/user_scanner/core/result.py b/user_scanner/core/result.py index 865657f..025be4c 100644 --- a/user_scanner/core/result.py +++ b/user_scanner/core/result.py @@ -1,4 +1,5 @@ from enum import Enum +from colorama import Fore, Style DEBUG_MSG = """Result {{ status: {status}, @@ -123,3 +124,18 @@ def __eq__(self, other): return NotImplemented + def get_console_output(self) -> str: + site_name = self.site_name + username = self.username + + if self == Status.AVAILABLE: + return f" {Fore.GREEN}[✔] {site_name} ({username}): Available{Style.RESET_ALL}" + elif self == Status.TAKEN: + return f" {Fore.RED}[✘] {site_name} ({username}): Taken{Style.RESET_ALL}" + elif self == Status.ERROR: + reason = "" + if isinstance(self, Result) and self.has_reason(): + reason = f" ({self.get_reason()})" + return f" {Fore.YELLOW}[!] {site_name} ({username}): Error{reason}{Style.RESET_ALL}" + + return "" From 7afad3e7c49aabad683cc1ab47698db9e8b45010 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 10:20:18 +0530 Subject: [PATCH 07/30] feat(email-osint): implement email osint logic for -e flag --- user_scanner/__main__.py | 255 ++++++++++++++++++++------------------- 1 file changed, 130 insertions(+), 125 deletions(-) diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index 00669f7..63b409c 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -2,17 +2,37 @@ import time import sys import re -from user_scanner.core.orchestrator import generate_permutations, load_categories, load_modules from colorama import Fore, Style -from user_scanner.cli.banner import print_banner from typing import List + +from user_scanner.cli.banner import print_banner from user_scanner.core.result import Result from user_scanner.core.version import load_local_version -from user_scanner.core.helpers import is_last_value from user_scanner.core import formatter from user_scanner.utils.updater_logic import check_for_updates from user_scanner.utils.update import update_self +from user_scanner.core.helpers import ( + load_categories, + load_modules, + find_module, + is_last_value, + get_site_name +) + +from user_scanner.core.orchestrator import ( + generate_permutations, + run_checks, + run_checks_category, + run_module_single +) + +from user_scanner.core.email_orchestrator import ( + run_email_full_batch, + run_email_category_batch, + run_email_module_batch +) + # Color configs R = Fore.RED G = Fore.GREEN @@ -20,172 +40,157 @@ Y = Fore.YELLOW X = Fore.RESET - -MAX_PERMUTATIONS_LIMIT = 100 # To prevent excessive generation +MAX_PERMUTATIONS_LIMIT = 100 def main(): - parser = argparse.ArgumentParser( prog="user-scanner", - description="Scan usernames across multiple platforms." + description="Scan usernames or emails across multiple platforms." ) group = parser.add_mutually_exclusive_group(required=False) + group.add_argument("-u", "--username", + help="Username to scan across platforms") + group.add_argument("-e", "--email", help="Email to scan across platforms") + + parser.add_argument("-c", "--category", + help="Scan all platforms in a category") + + parser.add_argument("-m", "--module", help="Scan a single specific module") + + parser.add_argument("-l", "--list", action="store_true", + help="List all available modules for username scanning") + + parser.add_argument("-v", "--verbose", action="store_true", + help="Enable verbose output") + + parser.add_argument("-p", "--permute", type=str, + help="Generate permutations using a pattern") + + parser.add_argument("-s", "--stop", type=int, + default=MAX_PERMUTATIONS_LIMIT, help="Limit permutations") + + parser.add_argument("-d", "--delay", type=float, + default=0, help="Delay between requests") - group.add_argument( - "-u", "--username", help="Username to scan across platforms" - ) - group.add_argument( - "-e", "--email", help="Email to scan across platforms" - ) - parser.add_argument( - "-c", "--category", choices=load_categories().keys(), - help="Scan all platforms in a category" - ) - parser.add_argument( - "-m", "--module", help="Scan a single specific module across all categories" - ) - parser.add_argument( - "-l", "--list", action="store_true", help="List all available modules by category" - ) - parser.add_argument( - "-v", "--verbose", action="store_true", help="Enable verbose output" - ) - parser.add_argument( - "-p", "--permute", type=str, help="Generate username permutations using a string pattern (e.g -p 234)" - ) - parser.add_argument( - "-s", "--stop", type=int, default=MAX_PERMUTATIONS_LIMIT, help="Limit the number of username permutations generated" - ) - parser.add_argument( - "-d", "--delay", type=float, default=0, help="Delay in seconds between requests (recommended: 1-2 seconds)" - ) - parser.add_argument( - "-f", "--format", choices=["console", "csv", "json"], default="console", help="Specify the output format (default: console)" - ) - parser.add_argument( - "-o", "--output", type=str, help="Specify the output file" - ) parser.add_argument( - "-U", "--update", action="store_true", help="Update user-scanner to latest version" - ) + "-f", "--format", choices=["console", "csv", "json"], default="console", help="Output format") + + parser.add_argument("-o", "--output", type=str, help="Output file path") + parser.add_argument( - "--version", action="store_true", help="Print the current pypi version of the tool" - ) + "-U", "--update", action="store_true", help="Update the tool") + + parser.add_argument("--version", action="store_true", help="Print version") + args = parser.parse_args() - if args.update is True: + if args.update: update_self() print(f"[{G}+{X}] {G}Update successful. Please restart the tool.{X}") sys.exit(0) + if args.version: + version, _ = load_local_version() + print(f"user-scanner current version -> {G}{version}{X}") + sys.exit(0) + if args.list: - categories = load_categories() + is_email_list = args.email is not None + categories = load_categories(is_email_list) for cat_name, cat_path in categories.items(): modules = load_modules(cat_path) print(Fore.MAGENTA + f"\n== {cat_name.upper()} SITES =={Style.RESET_ALL}") for module in modules: - site_name = module.__name__.split(".")[-1].capitalize() - print(f" - {site_name}") - + print(f" - {get_site_name(module)}") return - if args.version: - version, _ = load_local_version() - print(f"user-scanner current version -> {G}{version}{X}") - sys.exit(0) - check_for_updates() - if not (args.username or args.email): parser.print_help() return + check_for_updates() print_banner() is_email = args.email is not None if is_email and not re.findall(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", args.email): - print(R + "[✘] Error: Invalid email." + X) + print(R + "[✘] Error: Invalid email format." + X) sys.exit(1) - if args.permute and args.delay == 0: - print( - Y - + "[!] Warning: You're generating multiple usernames with NO delay between requests. " - "This may trigger rate limits or IP bans. Use --delay 1 or higher. (Use only if the sites throw errors otherwise ignore)\n" - + Style.RESET_ALL) - - name = args.username or args.email # Username or email - usernames = [name] # Default single username list + target_name = args.username or args.email + targets = [target_name] - # Added permutation support , generate all possible permutation of given sequence. if args.permute: - usernames = generate_permutations( - name, args.permute, args.stop, is_email) + targets = generate_permutations( + target_name, args.permute, args.stop, is_email) print( - C + f"[+] Generated {len(usernames)} username permutations" + Style.RESET_ALL) - - if args.module and "." in args.module: - args.module = args.module.replace(".", "_") - - def run_all_usernames(func, arg=None) -> List[Result]: - """ - Executes a function for all given usernames. - Made in order to simplify main() - """ - results = [] - for i, name in enumerate(usernames): - is_last = is_last_value(usernames, i) - if arg is None: - results.extend(func(name)) - else: - results.extend(func(arg, name)) - if args.delay > 0 and not is_last: - time.sleep(args.delay) - return results + C + f"[+] Generated {len(targets)} permutations" + Style.RESET_ALL) results = [] - if args.module: - # Single module search across all categories - from user_scanner.core.orchestrator import run_module_single, find_module - modules = find_module(args.module) - - if len(modules) > 0: - for module in modules: - results.extend(run_all_usernames(run_module_single, module)) + if is_email: + if args.module: + modules = find_module(args.module, is_email=True) + if modules: + for module in modules: + results.extend(run_email_module_batch(module, targets)) + else: + print( + R + f"[!] Email module '{args.module}' not found." + Style.RESET_ALL) + elif args.category: + cat_path = load_categories(is_email=True).get(args.category) + if cat_path: + results.extend(run_email_category_batch(cat_path, targets)) + else: + print( + R + f"[!] Email category '{args.category}' not found." + Style.RESET_ALL) else: - print( - R + f"[!] Module '{args.module}' not found in any category." + Style.RESET_ALL) - - elif args.category: - # Category-wise scan - category_package = load_categories().get(args.category) - from user_scanner.core.orchestrator import run_checks_category - results = run_all_usernames(run_checks_category, category_package) - + results = run_email_full_batch(targets) else: - # Full scan - from user_scanner.core.orchestrator import run_checks - results = run_all_usernames(run_checks) - - if not args.output: - return - - if args.output and args.format == "console": - msg = ( - "\n[!] The console format cannot be " - f"written to file: '{args.output}'." - ) - print(R + msg + Style.RESET_ALL) - return - - - content = formatter.into_csv(results) if args.format == "csv" else formatter.into_json(results) + def run_all_targets(func, arg=None) -> List[Result]: + all_results = [] + for i, name in enumerate(targets): + is_last = is_last_value(targets, i) + res = func(name) if arg is None else func(arg, name) + if isinstance(res, list): + all_results.extend(res) + else: + all_results.append(res) + if args.delay > 0 and not is_last: + time.sleep(args.delay) + return all_results + + if args.module: + modules = find_module(args.module, is_email=False) + if modules: + for mod in modules: + results.extend(run_all_targets(run_module_single, mod)) + else: + print( + R + f"[!] Module '{args.module}' not found." + Style.RESET_ALL) + elif args.category: + cat_path = load_categories(is_email=False).get(args.category) + if cat_path: + results.extend(run_all_targets(run_checks_category, cat_path)) + else: + print( + R + f"[!] Category '{args.category}' not found." + Style.RESET_ALL) + else: + results = run_all_targets(run_checks) - with open(args.output, "a", encoding="utf-8") as f: - f.write(content) + if args.output: + if args.format == "console": + print( + R + f"\n[!] Console format cannot be saved to '{args.output}'." + Style.RESET_ALL) + return + + content = formatter.into_csv( + results) if args.format == "csv" else formatter.into_json(results) + with open(args.output, "a", encoding="utf-8") as f: + f.write(content) + print(G + f"\n[+] Results saved to {args.output}" + Style.RESET_ALL) if __name__ == "__main__": From 0f70e656cf41498c62cdd9a0e4e1afc80acaaa09 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 10:25:49 +0530 Subject: [PATCH 08/30] mv: move the helper function from orchestrator.py to helpers.py --- user_scanner/core/helpers.py | 59 ++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/user_scanner/core/helpers.py b/user_scanner/core/helpers.py index 28c65b8..6d9327d 100644 --- a/user_scanner/core/helpers.py +++ b/user_scanner/core/helpers.py @@ -1,9 +1,68 @@ +# helpers.py +import importlib +import importlib.util +from types import ModuleType +from pathlib import Path +from typing import Dict, List + def get_site_name(module) -> str: name = module.__name__.split('.')[-1].capitalize().replace("_", ".") if name == "X": return "X (Twitter)" return name +def load_modules(category_path: Path) -> List[ModuleType]: + modules = [] + for file in category_path.glob("*.py"): + if file.name == "__init__.py": + continue + spec = importlib.util.spec_from_file_location(file.stem, str(file)) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + modules.append(module) + return modules + + +def load_categories(is_email: bool = False) -> Dict[str, Path]: + folder_name = "email_scan" if is_email else "user_scan" + root = Path(__file__).resolve().parent.parent / folder_name + categories = {} + + for subfolder in root.iterdir(): + if subfolder.is_dir() and \ + subfolder.name.lower() not in ["cli", "utils", "core"] and \ + "__" not in subfolder.name: # Removes __pycache__ + categories[subfolder.name] = subfolder.resolve() + + return categories + + +def find_module(name: str, is_email: bool = False) -> List[ModuleType]: + name = name.lower() + + return [ + module + for category_path in load_categories(is_email).values() + for module in load_modules(category_path) + if module.__name__.split(".")[-1].lower() == name + ] + + +def find_category(module: ModuleType) -> str | None: + + module_file = getattr(module, '__file__', None) + if not module_file: + return None + + category = Path(module_file).parent.name.lower() + if category in load_categories(False) or category in load_categories(True): + return category.capitalize() + + return None + def is_last_value(values, i: int) -> bool: return i == len(values) - 1 From 70c4204c41bbd34aadbc53493ae2c7e17b4bb7cd Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 10:28:06 +0530 Subject: [PATCH 09/30] feat(email-osint): add email_orchestrator for separate email OSINT flow --- user_scanner/core/email_orchestrator.py | 73 +++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 user_scanner/core/email_orchestrator.py diff --git a/user_scanner/core/email_orchestrator.py b/user_scanner/core/email_orchestrator.py new file mode 100644 index 0000000..75fb62b --- /dev/null +++ b/user_scanner/core/email_orchestrator.py @@ -0,0 +1,73 @@ +import asyncio +from pathlib import Path +from typing import List +from types import ModuleType + +from user_scanner.core.helpers import load_categories, load_modules, find_category +from user_scanner.core.result import Result + + +MAX_CONCURRENT_REQUESTS = 10 + + +async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) -> Result: + async with sem: + module_name = module.__name__.split(".")[-1] + func_name = f"validate_{module_name}" + + if not hasattr(module, func_name): + return Result.error(f"Function {func_name} not found") + + func = getattr(module, func_name) + + try: + res = func(email) + result = await res if asyncio.iscoroutine(res) else res + except Exception as e: + result = Result.error(e) + + cat_name = find_category(module) or "Email" + result.update( + site_name=module_name.capitalize(), + username=email, + category=cat_name, + is_email=True, + ) + + print(result.get_console_output()) + return result + + +async def _run_batch(modules: List[ModuleType], emails: List[str]) -> List[Result]: + sem = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) + tasks = [] + + for module in modules: + for email in emails: + tasks.append(_async_worker(module, email, sem)) + + if not tasks: + return [] + + return list(await asyncio.gather(*tasks)) + + +def run_email_module_batch(module: ModuleType, emails: List[str]) -> List[Result]: + return asyncio.run(_run_batch([module], emails)) + + +def run_email_category_batch(category_path: Path, emails: List[str]) -> List[Result]: + modules = load_modules(category_path) + return asyncio.run(_run_batch(modules, emails)) + + +def run_email_full_batch(emails: List[str]) -> List[Result]: + categories = load_categories(is_email=True) + all_results = [] + + for cat_path in categories.values(): + modules = load_modules(cat_path) + cat_results = asyncio.run(_run_batch(modules, emails)) + all_results.extend(cat_results) + + return all_results From 0d723b94aa8903c54b958c88be47132c0c16db77 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 10:31:07 +0530 Subject: [PATCH 10/30] feat(email-osint): add new logics for email OSINT result outputs --- user_scanner/core/result.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/user_scanner/core/result.py b/user_scanner/core/result.py index 025be4c..cd6dd23 100644 --- a/user_scanner/core/result.py +++ b/user_scanner/core/result.py @@ -36,8 +36,16 @@ class Status(Enum): AVAILABLE = 1 ERROR = 2 + def to_label(self, is_email=False): + """Returns the appropriate text label based on the scan type""" + if self == Status.ERROR: + return "Error" + if is_email: + return "Registered" if self == Status.TAKEN else "Not Registered" + return "Taken" if self == Status.TAKEN else "Available" + def __str__(self): - return super().__str__().split(".")[1].capitalize() + return self.to_label(is_email=False) class Result: @@ -48,10 +56,11 @@ def __init__(self, status: Status, reason: str | Exception | None = None, **kwar self.username = None self.site_name = None self.category = None + self.is_email = False # Track if this is an email result self.update(**kwargs) def update(self, **kwargs): - for field in ("username", "site_name", "category"): + for field in ("username", "site_name", "category", "is_email"): if field in kwargs and kwargs[field] is not None: setattr(self, field, kwargs[field]) @@ -93,7 +102,7 @@ def get_reason(self) -> str: def as_dict(self) -> dict: return { - "status": self.status, + "status": self.status.to_label(self.is_email), # Use dynamic labels "reason": self.get_reason(), "username": self.username, "site_name": self.site_name, @@ -127,15 +136,16 @@ def __eq__(self, other): def get_console_output(self) -> str: site_name = self.site_name username = self.username + status_text = self.status.to_label(self.is_email) if self == Status.AVAILABLE: - return f" {Fore.GREEN}[✔] {site_name} ({username}): Available{Style.RESET_ALL}" + return f" {Fore.GREEN}[✔] {site_name} ({username}): {status_text}{Style.RESET_ALL}" elif self == Status.TAKEN: - return f" {Fore.RED}[✘] {site_name} ({username}): Taken{Style.RESET_ALL}" + return f" {Fore.RED}[✘] {site_name} ({username}): {status_text}{Style.RESET_ALL}" elif self == Status.ERROR: reason = "" if isinstance(self, Result) and self.has_reason(): reason = f" ({self.get_reason()})" - return f" {Fore.YELLOW}[!] {site_name} ({username}): Error{reason}{Style.RESET_ALL}" - + return f" {Fore.YELLOW}[!] {site_name} ({username}): {status_text}{reason}{Style.RESET_ALL}" + return "" From 16ff86ff1b4e8c9d173f258ed270188e8d6415bd Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 10:33:44 +0530 Subject: [PATCH 11/30] refactor: move helper functions to helpers.py --- user_scanner/core/orchestrator.py | 60 ++----------------------------- 1 file changed, 2 insertions(+), 58 deletions(-) diff --git a/user_scanner/core/orchestrator.py b/user_scanner/core/orchestrator.py index 9666fe4..3cea3b0 100644 --- a/user_scanner/core/orchestrator.py +++ b/user_scanner/core/orchestrator.py @@ -1,67 +1,11 @@ -import importlib -import importlib.util from colorama import Fore, Style from concurrent.futures import ThreadPoolExecutor from itertools import permutations import httpx from pathlib import Path from user_scanner.core.result import Result -from types import ModuleType -from typing import Callable, Dict, List -from user_scanner.core.helpers import get_site_name - - -def load_modules(category_path: Path) -> List[ModuleType]: - modules = [] - for file in category_path.glob("*.py"): - if file.name == "__init__.py": - continue - spec = importlib.util.spec_from_file_location(file.stem, str(file)) - if spec is None or spec.loader is None: - continue - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - modules.append(module) - return modules - - -def load_categories(is_email: bool = False) -> Dict[str, Path]: - folder_name = "email_scan" if is_email else "user_scan" - root = Path(__file__).resolve().parent.parent / folder_name - categories = {} - - for subfolder in root.iterdir(): - if subfolder.is_dir() and \ - subfolder.name.lower() not in ["cli", "utils", "core"] and \ - "__" not in subfolder.name: # Removes __pycache__ - categories[subfolder.name] = subfolder.resolve() - - return categories - - -def find_module(name: str, is_email: bool = False) -> List[ModuleType]: - name = name.lower() - - return [ - module - for category_path in load_categories(is_email).values() - for module in load_modules(category_path) - if module.__name__.split(".")[-1].lower() == name - ] - - -def find_category(module: ModuleType) -> str | None: - - module_file = getattr(module, '__file__', None) - if not module_file: - return None - - category = Path(module_file).parent.name.lower() - if category in load_categories(False) or category in load_categories(True): - return category.capitalize() - - return None +from typing import Callable, List +from user_scanner.core.helpers import find_category, get_site_name, load_categories, load_modules def worker_single(module, username: str) -> Result: From c71001e388d6779e76bcf25409f8462a6a78e8b9 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 10:45:35 +0530 Subject: [PATCH 12/30] refactor: changes made to work with email_orchestrator and asyncio --- user_scanner/email_scan/social/instagram.py | 31 ++++++--------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/user_scanner/email_scan/social/instagram.py b/user_scanner/email_scan/social/instagram.py index 53efc94..32d7774 100644 --- a/user_scanner/email_scan/social/instagram.py +++ b/user_scanner/email_scan/social/instagram.py @@ -1,9 +1,9 @@ -import asyncio import httpx +from user_scanner.core.result import Result -async def check_instagram(email): - ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36" - async with httpx.AsyncClient(headers={"user-agent": ua}, http2=True) as client: +async def _check(email): + USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36" + async with httpx.AsyncClient(headers={"user-agent": USER_AGENT}, http2=True) as client: await client.get("https://www.instagram.com/") csrf = client.cookies.get("csrftoken") @@ -14,27 +14,14 @@ async def check_instagram(email): "referer": "https://www.instagram.com/accounts/password/reset/" } - resp = await client.post( + response = await client.post( "https://www.instagram.com/api/v1/web/accounts/account_recovery_send_ajax/", data={"email_or_username": email}, headers=headers ) - data = resp.json() - if data.get("status") == "ok": - return f"[!] {email} -> USED (Account exists)" - return f"[*] {email} -> NOT USED" - -if __name__ == "__main__": - target = input("Enter the Email: ") - print(asyncio.run(check_instagram(target))) - - - - - - - - - + data = response.json() + return Result.taken() if data.get("status") == "ok" else Result.available() +def validate_instagram(email: str) -> Result: + return _check(email) From d4b19720a4ee0cab891608e31892d1e4380066ff Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 11:12:11 +0530 Subject: [PATCH 13/30] fix(x.py): minor issues and refactor to make it work with new orchestrator --- user_scanner/email_scan/social/x.py | 40 +++++++++++++++++++---------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/user_scanner/email_scan/social/x.py b/user_scanner/email_scan/social/x.py index f095e43..d4da6f7 100644 --- a/user_scanner/email_scan/social/x.py +++ b/user_scanner/email_scan/social/x.py @@ -1,26 +1,38 @@ -import asyncio import httpx +from user_scanner.core.result import Result -async def check_x(email): - url = f"https://api.x.com/i/users/email_available.json?email={email}" +async def _check(email): + url = "https://api.x.com/i/users/email_available.json" + params = {"email": email} headers = { "user-agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Mobile Safari/537.36", + "accept-encoding": "gzip, deflate, br, zstd", + "sec-ch-ua-platform": "\"Android\"", + "sec-ch-ua": "\"Google Chrome\";v=\"143\", \"Chromium\";v=\"143\", \"Not A(Brand\";v=\"24\"", "x-twitter-client-language": "en", + "sec-ch-ua-mobile": "?1", "x-twitter-active-user": "yes", - "referer": "https://x.com/" + "origin": "https://x.com", + "sec-fetch-site": "same-site", + "sec-fetch-mode": "cors", + "sec-fetch-dest": "empty", + "referer": "https://x.com/", + "accept-language": "en-US,en;q=0.9", + "priority": "u=1, i" } async with httpx.AsyncClient(http2=True) as client: try: - resp = await client.get(url, headers=headers) - data = resp.json() - if data.get("taken"): - return f"[!] {email} -> REGISTERED" - return f"[*] {email} -> NOT REGISTERED" + response = await client.get(url, params=params, headers=headers) + data = response.json() + taken_bool= data["taken"] + if taken_bool is True: + return Result.taken() + + elif taken_bool is False: + return Result.available() except Exception as e: - return f"Error checking {email}: {e}" + return Result.error(e) -if __name__ == "__main__": - target_email = input("Enter Email: ").strip() - result = asyncio.run(check_x(target_email)) - print(result) +def validate_x(email: str) -> Result: + return _check(email) From 2a7ff6629d12c8f8ab8a063c70803b823cee2221 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 12:10:02 +0530 Subject: [PATCH 14/30] fix(x.py): improve error handling and rate-limit messaging --- user_scanner/email_scan/social/x.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/user_scanner/email_scan/social/x.py b/user_scanner/email_scan/social/x.py index d4da6f7..2a3599c 100644 --- a/user_scanner/email_scan/social/x.py +++ b/user_scanner/email_scan/social/x.py @@ -13,26 +13,26 @@ async def _check(email): "sec-ch-ua-mobile": "?1", "x-twitter-active-user": "yes", "origin": "https://x.com", - "sec-fetch-site": "same-site", - "sec-fetch-mode": "cors", - "sec-fetch-dest": "empty", - "referer": "https://x.com/", - "accept-language": "en-US,en;q=0.9", "priority": "u=1, i" } async with httpx.AsyncClient(http2=True) as client: try: response = await client.get(url, params=params, headers=headers) + + if response.status_code == 429: + return Result.error("Rate limited wait for few minutes") + data = response.json() - taken_bool= data["taken"] + taken_bool = data.get("taken") + if taken_bool is True: return Result.taken() - - elif taken_bool is False: + else: return Result.available() + except Exception as e: - return Result.error(e) + return Result.error(str(e)) def validate_x(email: str) -> Result: return _check(email) From 0d98fe5ce58c31613b4a4b5bc67ce18df684c52a Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 18:19:32 +0530 Subject: [PATCH 15/30] fix(instagram.py): improve error handling and extended headers --- user_scanner/email_scan/social/instagram.py | 29 ++++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/user_scanner/email_scan/social/instagram.py b/user_scanner/email_scan/social/instagram.py index 32d7774..698ae38 100644 --- a/user_scanner/email_scan/social/instagram.py +++ b/user_scanner/email_scan/social/instagram.py @@ -1,6 +1,7 @@ import httpx from user_scanner.core.result import Result + async def _check(email): USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36" async with httpx.AsyncClient(headers={"user-agent": USER_AGENT}, http2=True) as client: @@ -9,9 +10,21 @@ async def _check(email): headers = { "x-csrftoken": csrf, - "x-ig-app-id": "936619743392459", - "x-requested-with": "XMLHttpRequest", - "referer": "https://www.instagram.com/accounts/password/reset/" + 'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + 'Accept-Encoding': "identity", + 'sec-ch-ua-full-version-list': "\"Google Chrome\";v=\"143.0.7499.146\", \"Chromium\";v=\"143.0.7499.146\", \"Not A(Brand\";v=\"24.0.0.0\"", + 'sec-ch-ua-platform': "\"Linux\"", + 'sec-ch-ua': "\"Google Chrome\";v=\"143\", \"Chromium\";v=\"143\", \"Not A(Brand\";v=\"24\"", + 'sec-ch-ua-model': "\"\"", + 'sec-ch-ua-mobile': "?0", + 'x-ig-app-id': "936619743392459", + 'x-requested-with': "XMLHttpRequest", + 'x-instagram-ajax': "1031566424", + 'x-asbd-id': "359341", + 'x-ig-www-claim': "0", + 'sec-ch-ua-platform-version': "\"\"", + 'origin': "https://www.instagram.com", + 'referer': "https://www.instagram.com/accounts/password/reset/" } response = await client.post( @@ -21,7 +34,15 @@ async def _check(email): ) data = response.json() - return Result.taken() if data.get("status") == "ok" else Result.available() + status_val = data.get("status") + if status_val == "ok": + return Result.taken() + elif status_val == "fail": + return Result.available() + else: + return Result.error("Unexpected response body, report it on github") + + def validate_instagram(email: str) -> Result: return _check(email) From 4089bd2a87036386222d57a730c2c805e550d917 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 22:31:19 +0530 Subject: [PATCH 16/30] add: mastodon email validator --- user_scanner/email_scan/social/mastodon.py | 53 ++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 user_scanner/email_scan/social/mastodon.py diff --git a/user_scanner/email_scan/social/mastodon.py b/user_scanner/email_scan/social/mastodon.py new file mode 100644 index 0000000..217a8e2 --- /dev/null +++ b/user_scanner/email_scan/social/mastodon.py @@ -0,0 +1,53 @@ +import httpx +import re +from user_scanner.core.result import Result + + +async def _check(email): + base_url = "https://mastodon.social" + signup_url = f"{base_url}/auth/sign_up" + post_url = f"{base_url}/auth" + + headers = { + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", + "referer": "https://mastodon.social/explore", + "origin": "https://mastodon.social" + } + + async with httpx.AsyncClient(http2=True, headers=headers, follow_redirects=True) as client: + try: + initial_resp = await client.get(signup_url) + if initial_resp.status_code != 200: + return Result.error(f"Failed to access signup page: {initial_resp.status_code}") + + token_match = re.search( + r'name="csrf-token" content="([^"]+)"', initial_resp.text) + if not token_match: + return Result.error("Could not find authenticity token") + + csrf_token = token_match.group(1) + + payload = { + "authenticity_token": csrf_token, + "user[account_attributes][username]": "no3motions_robot_020102", + "user[email]": email, + "user[password]": "Theleftalone@me", + "user[password_confirmation]": "Theleftalone@me", + "user[agreement]": "1", + "button": "" + } + + response = await client.post(post_url, data=payload) + + if "has already been taken" in response.text: + return Result.taken() + else: + return Result.available() + + except Exception as e: + return Result.error(str(e)) + + +def validate_mastodon(email: str) -> Result: + return _check(email) From 7343fc15f35dc23de22c64b41ecebe082a8e0ec0 Mon Sep 17 00:00:00 2001 From: kaifcodec Date: Fri, 2 Jan 2026 22:40:33 +0530 Subject: [PATCH 17/30] fix(email_orch): uses concurrency for full scans, prints header for every category --- user_scanner/core/email_orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/user_scanner/core/email_orchestrator.py b/user_scanner/core/email_orchestrator.py index 75fb62b..2506869 100644 --- a/user_scanner/core/email_orchestrator.py +++ b/user_scanner/core/email_orchestrator.py @@ -7,7 +7,7 @@ from user_scanner.core.result import Result -MAX_CONCURRENT_REQUESTS = 10 +MAX_CONCURRENT_REQUESTS = 15 async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) -> Result: @@ -26,7 +26,7 @@ async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) except Exception as e: result = Result.error(e) - cat_name = find_category(module) or "Email" + cat_name = find_category(module) result.update( site_name=module_name.capitalize(), username=email, From 101f81df5fec8a33d67222ffd714b97b3acd0936 Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Fri, 2 Jan 2026 17:24:17 +0000 Subject: [PATCH 18/30] update requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 0bb9a42..016a6bc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -httpx +httpx[http2] colorama From a995b182c84419676f3d26804b3416360cc70bc7 Mon Sep 17 00:00:00 2001 From: Kaif Date: Fri, 2 Jan 2026 23:15:55 +0530 Subject: [PATCH 19/30] rm (github.py): Its non-functional and wasn't not correctly handled --- user_scanner/email_scan/dev/github.py | 46 --------------------------- 1 file changed, 46 deletions(-) delete mode 100644 user_scanner/email_scan/dev/github.py diff --git a/user_scanner/email_scan/dev/github.py b/user_scanner/email_scan/dev/github.py deleted file mode 100644 index 5c37eaa..0000000 --- a/user_scanner/email_scan/dev/github.py +++ /dev/null @@ -1,46 +0,0 @@ -import asyncio -import httpx -import re - -async def check_email(email): - headers = { - "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", - "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", - "referer": "https://github.com/signup" - } - - async with httpx.AsyncClient(headers=headers, http2=True, follow_redirects=True) as client: - res = await client.get("https://github.com/signup") - - token_match = re.search(r'name="authenticity_token" value="([^"]+)"', res.text) - if not token_match: - return "Token not found" - - token = token_match.group(1) - - data = { - "authenticity_token": token, - "value": email - } - - post_headers = {"accept": "*/*", "x-requested-with": "XMLHttpRequest"} - response = await client.post("https://github.com/email_validity_checks", data=data, headers=post_headers) - - if "The email you have provided is already associated with an account." not in response.text: - return f"[+] {email} is not REGISTERED" - else: - return f"[-] {email} is Registerd" - -async def main(): - email_to_test = input("Enter the email: ") - result = await check_email(email_to_test) - print(result) - -if __name__ == "__main__": - asyncio.run(main()) - - - - - - From 7659305027127aa7ed55bef3462fee728dce6fc7 Mon Sep 17 00:00:00 2001 From: Kaif Date: Fri, 2 Jan 2026 23:23:56 +0530 Subject: [PATCH 20/30] fix(email_orch): uses concurrency for full scans, prints header for every category --- user_scanner/core/email_orchestrator.py | 39 ++++++++++++++++++++----- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/user_scanner/core/email_orchestrator.py b/user_scanner/core/email_orchestrator.py index 2506869..9f6429f 100644 --- a/user_scanner/core/email_orchestrator.py +++ b/user_scanner/core/email_orchestrator.py @@ -2,17 +2,18 @@ from pathlib import Path from typing import List from types import ModuleType +from colorama import Fore, Style from user_scanner.core.helpers import load_categories, load_modules, find_category from user_scanner.core.result import Result - +# Concurrency control MAX_CONCURRENT_REQUESTS = 15 async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) -> Result: async with sem: - module_name = module.__name__.split(".")[-1] + module_name = module.__name__.split('.')[-1] func_name = f"validate_{module_name}" if not hasattr(module, func_name): @@ -26,12 +27,14 @@ async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) except Exception as e: result = Result.error(e) - cat_name = find_category(module) + # Use helper to get actual dir name for the Result object + actual_cat = find_category(module) or "Email" + result.update( site_name=module_name.capitalize(), username=email, - category=cat_name, - is_email=True, + category=actual_cat, + is_email=True ) print(result.get_console_output()) @@ -41,14 +44,12 @@ async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) async def _run_batch(modules: List[ModuleType], emails: List[str]) -> List[Result]: sem = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) tasks = [] - for module in modules: for email in emails: tasks.append(_async_worker(module, email, sem)) if not tasks: return [] - return list(await asyncio.gather(*tasks)) @@ -57,6 +58,9 @@ def run_email_module_batch(module: ModuleType, emails: List[str]) -> List[Result def run_email_category_batch(category_path: Path, emails: List[str]) -> List[Result]: + cat_name = category_path.name.upper() + print(f"\n{Fore.MAGENTA}== {cat_name} SITES =={Style.RESET_ALL}") + modules = load_modules(category_path) return asyncio.run(_run_batch(modules, emails)) @@ -65,9 +69,28 @@ def run_email_full_batch(emails: List[str]) -> List[Result]: categories = load_categories(is_email=True) all_results = [] - for cat_path in categories.values(): + for cat_name, cat_path in categories.items(): + print(f"\n{Fore.MAGENTA}== {cat_name.upper()} SITES =={Style.RESET_ALL}") + modules = load_modules(cat_path) cat_results = asyncio.run(_run_batch(modules, emails)) all_results.extend(cat_results) return all_results + + + + + + + + + + + + + + + + + From e62e93535859d6b4a532d44a323992a42f188040 Mon Sep 17 00:00:00 2001 From: Kaif Date: Fri, 2 Jan 2026 23:25:11 +0530 Subject: [PATCH 21/30] clean: email_orchestrator.py, removed new lines --- user_scanner/core/email_orchestrator.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/user_scanner/core/email_orchestrator.py b/user_scanner/core/email_orchestrator.py index 9f6429f..9b42bff 100644 --- a/user_scanner/core/email_orchestrator.py +++ b/user_scanner/core/email_orchestrator.py @@ -77,20 +77,3 @@ def run_email_full_batch(emails: List[str]) -> List[Result]: all_results.extend(cat_results) return all_results - - - - - - - - - - - - - - - - - From c25989c457b7e82973b66beb4995d0b4b4ae7d91 Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Sat, 3 Jan 2026 17:28:04 +0000 Subject: [PATCH 22/30] refactor __main__ --- tests/test_orchestrator.py | 4 +- user_scanner/__main__.py | 72 +++++++++------------- user_scanner/core/email_orchestrator.py | 19 +++--- user_scanner/core/orchestrator.py | 24 +++----- user_scanner/email_scan/social/mastodon.py | 2 +- user_scanner/email_scan/social/x.py | 3 +- 6 files changed, 52 insertions(+), 72 deletions(-) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 15e6dd3..418c525 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -43,7 +43,7 @@ def validate_testsite(username): setattr(module, "validate_testsite", validate_testsite) - orchestrator.run_module_single(module, "bob") + orchestrator.run_user_module(module, "bob") out = capsys.readouterr().out assert 'bob' in out #Needs to be improved @@ -63,7 +63,7 @@ def validate(username): monkeypatch.setattr(orchestrator, "load_modules", lambda p: [module]) monkeypatch.setattr(orchestrator, "get_site_name", lambda m: "Testsite") - results = orchestrator.run_checks_category(tmp_path, "someone") + results = orchestrator.run_user_category(tmp_path, "someone") assert isinstance(results, list) assert len(results) == 1 assert results[0].to_number() == 0 # TAKEN diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index 63b409c..d4a97d9 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -3,10 +3,8 @@ import sys import re from colorama import Fore, Style -from typing import List from user_scanner.cli.banner import print_banner -from user_scanner.core.result import Result from user_scanner.core.version import load_local_version from user_scanner.core import formatter from user_scanner.utils.updater_logic import check_for_updates @@ -16,15 +14,14 @@ load_categories, load_modules, find_module, - is_last_value, get_site_name ) from user_scanner.core.orchestrator import ( generate_permutations, - run_checks, - run_checks_category, - run_module_single + run_user_full, + run_user_category, + run_user_module ) from user_scanner.core.email_orchestrator import ( @@ -130,55 +127,42 @@ def main(): results = [] - if is_email: - if args.module: - modules = find_module(args.module, is_email=True) - if modules: - for module in modules: - results.extend(run_email_module_batch(module, targets)) - else: - print( - R + f"[!] Email module '{args.module}' not found." + Style.RESET_ALL) - elif args.category: - cat_path = load_categories(is_email=True).get(args.category) - if cat_path: - results.extend(run_email_category_batch(cat_path, targets)) - else: - print( - R + f"[!] Email category '{args.category}' not found." + Style.RESET_ALL) + for i, target in enumerate(targets): + if i != 0 and args.delay: + time.sleep(args.delay) + + if is_email: + print(f"\n{Fore.CYAN} Checking email: {target}{Style.RESET_ALL}") else: - results = run_email_full_batch(targets) - else: - def run_all_targets(func, arg=None) -> List[Result]: - all_results = [] - for i, name in enumerate(targets): - is_last = is_last_value(targets, i) - res = func(name) if arg is None else func(arg, name) - if isinstance(res, list): - all_results.extend(res) - else: - all_results.append(res) - if args.delay > 0 and not is_last: - time.sleep(args.delay) - return all_results + print(f"\n{Fore.CYAN} Checking username: {target}{Style.RESET_ALL}") if args.module: - modules = find_module(args.module, is_email=False) + modules = find_module(args.module, is_email) + fn = run_email_module_batch if is_email else run_user_module if modules: - for mod in modules: - results.extend(run_all_targets(run_module_single, mod)) + for module in modules: + results.extend(fn(module, target)) else: print( - R + f"[!] Module '{args.module}' not found." + Style.RESET_ALL) + R + + f"[!] {'Email' if is_email else 'User'} module '{args.module}' not found." + + Style.RESET_ALL + ) + elif args.category: - cat_path = load_categories(is_email=False).get(args.category) + cat_path = load_categories(is_email).get(args.category) + fn = run_email_category_batch if is_email else run_user_category if cat_path: - results.extend(run_all_targets(run_checks_category, cat_path)) + results.extend(fn(cat_path, target)) else: print( - R + f"[!] Category '{args.category}' not found." + Style.RESET_ALL) + R + + f"[!] {'Email' if is_email else 'User'} category '{args.module}' not found." + + Style.RESET_ALL + ) else: - results = run_all_targets(run_checks) + fn = run_email_full_batch if is_email else run_user_full + results.extend(fn(target)) if args.output: if args.format == "console": diff --git a/user_scanner/core/email_orchestrator.py b/user_scanner/core/email_orchestrator.py index 9b42bff..77dceb7 100644 --- a/user_scanner/core/email_orchestrator.py +++ b/user_scanner/core/email_orchestrator.py @@ -41,31 +41,30 @@ async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) return result -async def _run_batch(modules: List[ModuleType], emails: List[str]) -> List[Result]: +async def _run_batch(modules: List[ModuleType], email:str) -> List[Result]: sem = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) tasks = [] for module in modules: - for email in emails: - tasks.append(_async_worker(module, email, sem)) + tasks.append(_async_worker(module, email, sem)) if not tasks: return [] return list(await asyncio.gather(*tasks)) -def run_email_module_batch(module: ModuleType, emails: List[str]) -> List[Result]: - return asyncio.run(_run_batch([module], emails)) +def run_email_module_batch(module: ModuleType, email: str) -> List[Result]: + return asyncio.run(_run_batch([module], email)) -def run_email_category_batch(category_path: Path, emails: List[str]) -> List[Result]: - cat_name = category_path.name.upper() +def run_email_category_batch(category_path: Path, email: str) -> List[Result]: + cat_name = category_path.stem.capitalize() print(f"\n{Fore.MAGENTA}== {cat_name} SITES =={Style.RESET_ALL}") modules = load_modules(category_path) - return asyncio.run(_run_batch(modules, emails)) + return asyncio.run(_run_batch(modules, email)) -def run_email_full_batch(emails: List[str]) -> List[Result]: +def run_email_full_batch(email: str) -> List[Result]: categories = load_categories(is_email=True) all_results = [] @@ -73,7 +72,7 @@ def run_email_full_batch(emails: List[str]) -> List[Result]: print(f"\n{Fore.MAGENTA}== {cat_name.upper()} SITES =={Style.RESET_ALL}") modules = load_modules(cat_path) - cat_results = asyncio.run(_run_batch(modules, emails)) + cat_results = asyncio.run(_run_batch(modules, email)) all_results.extend(cat_results) return all_results diff --git a/user_scanner/core/orchestrator.py b/user_scanner/core/orchestrator.py index 3cea3b0..cebefe5 100644 --- a/user_scanner/core/orchestrator.py +++ b/user_scanner/core/orchestrator.py @@ -5,10 +5,11 @@ from pathlib import Path from user_scanner.core.result import Result from typing import Callable, List +from types import ModuleType from user_scanner.core.helpers import find_category, get_site_name, load_categories, load_modules -def worker_single(module, username: str) -> Result: +def _worker_single(module: ModuleType, username: str) -> Result: func = next((getattr(module, f) for f in dir(module) if f.startswith("validate_") and callable(getattr(module, f))), None) @@ -23,10 +24,9 @@ def worker_single(module, username: str) -> Result: return result except Exception as e: return Result.error(e, site_name=site_name, username=username) - - -def run_module_single(module, username: str) -> List[Result]: - result = worker_single(module, username) + +def run_user_module(module: ModuleType, username: str) -> List[Result]: + result = _worker_single(module, username) category = find_category(module) if category: @@ -37,16 +37,15 @@ def run_module_single(module, username: str) -> List[Result]: return [result] -def run_checks_category(category_path: Path, username: str) -> List[Result]: - modules = load_modules(category_path) - +def run_user_category(category_path: Path, username: str) -> List[Result]: category_name = category_path.stem.capitalize() print(f"\n{Fore.MAGENTA}== {category_name} SITES =={Style.RESET_ALL}") results = [] + modules = load_modules(category_path) with ThreadPoolExecutor(max_workers=10) as executor: - exec_map = executor.map(lambda m: worker_single(m, username), modules) + exec_map = executor.map(lambda m: _worker_single(m, username), modules) for result in exec_map: result.update(category=category_name) results.append(result) @@ -56,15 +55,12 @@ def run_checks_category(category_path: Path, username: str) -> List[Result]: return results -def run_checks(username: str) -> List[Result]: - - print(f"\n{Fore.CYAN} Checking username: {username}{Style.RESET_ALL}") - +def run_user_full(username: str) -> List[Result]: results = [] categories = list(load_categories().values()) for category_path in categories: - temp = run_checks_category(category_path, username) + temp = run_user_category(category_path, username) results.extend(temp) return results diff --git a/user_scanner/email_scan/social/mastodon.py b/user_scanner/email_scan/social/mastodon.py index 217a8e2..0685438 100644 --- a/user_scanner/email_scan/social/mastodon.py +++ b/user_scanner/email_scan/social/mastodon.py @@ -46,7 +46,7 @@ async def _check(email): return Result.available() except Exception as e: - return Result.error(str(e)) + return Result.error(e) def validate_mastodon(email: str) -> Result: diff --git a/user_scanner/email_scan/social/x.py b/user_scanner/email_scan/social/x.py index 2a3599c..9033a1e 100644 --- a/user_scanner/email_scan/social/x.py +++ b/user_scanner/email_scan/social/x.py @@ -32,7 +32,8 @@ async def _check(email): return Result.available() except Exception as e: - return Result.error(str(e)) + return Result.error(e) + def validate_x(email: str) -> Result: return _check(email) From 730719d4fe83277588a95576456813a3f4f8db76 Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Sat, 3 Jan 2026 17:54:29 +0000 Subject: [PATCH 23/30] update(result.py): json now also outputs email --- user_scanner/core/result.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/user_scanner/core/result.py b/user_scanner/core/result.py index cd6dd23..bf4a8c3 100644 --- a/user_scanner/core/result.py +++ b/user_scanner/core/result.py @@ -7,6 +7,7 @@ username: "{username}", site_name: "{site_name}", category: "{category}", + is_email: "{is_email}" }}""" JSON_TEMPLATE = """{{ @@ -106,14 +107,18 @@ def as_dict(self) -> dict: "reason": self.get_reason(), "username": self.username, "site_name": self.site_name, - "category": self.category + "category": self.category, + "is_email": self.is_email } def debug(self) -> str: return DEBUG_MSG.format(**self.as_dict()) def to_json(self) -> str: - return JSON_TEMPLATE.format(**self.as_dict()) + msg = JSON_TEMPLATE.format(**self.as_dict()) + if self.is_email: + msg = msg.replace("\t\"username\":", "\t\"email\":") + return msg def to_csv(self) -> str: return CSV_TEMPLATE.format(**self.as_dict()) From 080c1fff5cd1f54ef32b8ff3a1986dff4129812c Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Fri, 9 Jan 2026 17:28:40 +0000 Subject: [PATCH 24/30] move generate_permutations into helpers.py --- tests/test_helpers.py | 34 +++++++++++++++++++++++++++++++ tests/test_orchestrator.py | 22 -------------------- user_scanner/__main__.py | 4 ++-- user_scanner/core/helpers.py | 33 +++++++++++++++++++++++++++--- user_scanner/core/orchestrator.py | 30 --------------------------- 5 files changed, 66 insertions(+), 57 deletions(-) create mode 100644 tests/test_helpers.py diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..bd46eec --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,34 @@ +from user_scanner.core import helpers +from types import SimpleNamespace + +def test_generate_permutations(): + perms = helpers.generate_permutations("user", "ab", limit=None) + assert "user" in perms + # All permutations must be valid + assert all( + p == "user" or + (p.startswith("user") and len(p) > len("user")) + for p in perms + ) + + assert len(perms) > 1 + +def test_generate_permutations_email(): + perms = helpers.generate_permutations("john@email.com", "abc", limit=None, is_email=True) + assert "john@email.com" in perms + assert all( + p == "john@email.com" or + (p.startswith("john") and len(p) > len("john@email.com") and p.endswith("@email.com")) + for p in perms + ) + assert len(perms) > 1 + +def test_get_site_name(): + def module(name:str) -> SimpleNamespace: + return SimpleNamespace(**{"__name__":name}) + assert helpers.get_site_name(module("X")) == "X (Twitter)" + assert helpers.get_site_name(module("x")) == "X (Twitter)" + assert helpers.get_site_name(module("user_scanner.github")) == "Github" + assert helpers.get_site_name(module("user_scanner.chess_com")) == "Chess.com" + + diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 418c525..7b1b3c4 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -12,28 +12,6 @@ def test_status_validate_available(monkeypatch): assert res.to_number() == 1 # AVAILABLE -def test_generate_permutations(): - perms = orchestrator.generate_permutations("user", "ab", limit=None) - assert "user" in perms - # All permutations must be valid - assert all( - p == "user" or - (p.startswith("user") and len(p) > len("user")) - for p in perms - ) - - assert len(perms) > 1 - -def test_generate_permutations_email(): - perms = orchestrator.generate_permutations("john@email.com", "abc", limit=None, is_email=True) - assert "john@email.com" in perms - assert all( - p == "john@email.com" or - (p.startswith("john") and len(p) > len("john@email.com") and p.endswith("@email.com")) - for p in perms - ) - assert len(perms) > 1 - def test_run_module_single_prints_json_and_csv(capsys): module = types.ModuleType("fake.testsite") module.__file__ = "/fake/testsite.py" diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index d4a97d9..7485511 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -14,11 +14,11 @@ load_categories, load_modules, find_module, - get_site_name + get_site_name, + generate_permutations ) from user_scanner.core.orchestrator import ( - generate_permutations, run_user_full, run_user_category, run_user_module diff --git a/user_scanner/core/helpers.py b/user_scanner/core/helpers.py index 6d9327d..5ba6959 100644 --- a/user_scanner/core/helpers.py +++ b/user_scanner/core/helpers.py @@ -1,16 +1,18 @@ -# helpers.py import importlib import importlib.util +from itertools import permutations from types import ModuleType from pathlib import Path from typing import Dict, List + def get_site_name(module) -> str: name = module.__name__.split('.')[-1].capitalize().replace("_", ".") if name == "X": return "X (Twitter)" return name + def load_modules(category_path: Path) -> List[ModuleType]: modules = [] for file in category_path.glob("*.py"): @@ -64,5 +66,30 @@ def find_category(module: ModuleType) -> str | None: return None -def is_last_value(values, i: int) -> bool: - return i == len(values) - 1 +def generate_permutations(username: str, pattern: str, limit: int | None = None, is_email: bool = False) -> List[str]: + """ + Generate all order-based permutations of characters in `pattern` + appended after `username`. + """ + + if limit and limit <= 0: + return [] + + permutations_set = {username} + chars = list(pattern) + + domain = "" + if is_email: + username, domain = username.strip().split("@") + + # generate permutations of length 1 → len(chars) + for r in range(len(chars)): + for combo in permutations(chars, r): + new = username + ''.join(combo) + if is_email: + new += "@" + domain + permutations_set.add(new) + if limit and len(permutations_set) >= limit: + return sorted(permutations_set) + + return sorted(permutations_set) diff --git a/user_scanner/core/orchestrator.py b/user_scanner/core/orchestrator.py index cebefe5..cec1f9f 100644 --- a/user_scanner/core/orchestrator.py +++ b/user_scanner/core/orchestrator.py @@ -1,6 +1,5 @@ from colorama import Fore, Style from concurrent.futures import ThreadPoolExecutor -from itertools import permutations import httpx from pathlib import Path from user_scanner.core.result import Result @@ -120,32 +119,3 @@ def contains(a, b): return (isinstance(a, list) and b in a) or (a == b) return Result.error("Status didn't match. Report this on Github.") return generic_validate(url, inner, **kwargs) - - -def generate_permutations(username: str, pattern: str, limit: int | None = None, is_email: bool = False) -> List[str]: - """ - Generate all order-based permutations of characters in `pattern` - appended after `username`. - """ - - if limit and limit <= 0: - return [] - - permutations_set = {username} - chars = list(pattern) - - domain = "" - if is_email: - username, domain = username.strip().split("@") - - # generate permutations of length 1 → len(chars) - for r in range(len(chars)): - for combo in permutations(chars, r): - new = username + ''.join(combo) - if is_email: - new += "@" + domain - permutations_set.add(new) - if limit and len(permutations_set) >= limit: - return sorted(permutations_set) - - return sorted(permutations_set) From 17c703372d5fdea365ff330553aea65f67f53567 Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Sat, 10 Jan 2026 16:04:08 +0000 Subject: [PATCH 25/30] removed console format --- user_scanner/__main__.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index 7485511..127ee25 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -72,7 +72,7 @@ def main(): default=0, help="Delay between requests") parser.add_argument( - "-f", "--format", choices=["console", "csv", "json"], default="console", help="Output format") + "-f", "--format", choices=["csv", "json"], help="Output format") parser.add_argument("-o", "--output", type=str, help="Output file path") @@ -165,11 +165,6 @@ def main(): results.extend(fn(target)) if args.output: - if args.format == "console": - print( - R + f"\n[!] Console format cannot be saved to '{args.output}'." + Style.RESET_ALL) - return - content = formatter.into_csv( results) if args.format == "csv" else formatter.into_json(results) with open(args.output, "a", encoding="utf-8") as f: From b659d29e8a97f4bf4bad5ef5d4b226fec1b1d83c Mon Sep 17 00:00:00 2001 From: Vamato <62358492+VamatoHD@users.noreply.github.com> Date: Sat, 10 Jan 2026 16:20:00 +0000 Subject: [PATCH 26/30] updated README.md --- README.md | 78 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index d44e86d..2d91e72 100644 --- a/README.md +++ b/README.md @@ -12,15 +12,15 @@ --- -Scan a username across multiple social, developer, and creator platforms to see if it’s available. -Perfect for finding a **unique username** across GitHub, Twitter, Reddit, Instagram, and more, all in one command. +Scan a username or email across multiple social, developer, and creator platforms to see if it’s available. +Perfect for finding a **unique username or email** across GitHub, Twitter, Reddit, Instagram, and more, all in one command. -### Features +## Features - ✅ Check usernames across **social networks**, **developer platforms**, and **creator communities** -- ✅ Can be used as a username OSINT tool -- ✅ Smart auto-update system, Detects new releases on PyPI and interactively prompts the user to upgrade. +- ✅ Can be used as a username and email OSINT tool +- ✅ Smart auto-update system detects new releases on PyPI and interactively prompts the user to upgrade. - ✅ Clear **Available / Taken / Error** output for each platform - ✅ Robust error handling: It prints the exact reason (e.g. Cannot use underscores, hyphens at the start/end) - ✅ Fully modular: add new platform modules easily @@ -31,7 +31,7 @@ Perfect for finding a **unique username** across GitHub, Twitter, Reddit, Instag - ✅ Very low and lightweight dependencies, can be run on any machine --- -### Installation +## Installation ```bash pip install user-scanner @@ -39,43 +39,59 @@ pip install user-scanner --- -### Usage +## Usage -Scan a username across all platforms: +### Basic username/email scan + +Scan a signle username across **all** available modules/platforms: ```bash -user-scanner -u +user-scanner -u john_doe +user-scanner --username john_doe # long version + +user-scanner -e john_doe@gmail.com +user-scanner --email john_doe@gmail.com # long version ``` -Optionally, scan a specific category or single module: + +### Selective scanning + +Scan only specific category or single modules: ```bash -user-scanner -u -c dev -user-scanner -l # Lists all available modules -user-scanner -u -m github +user-scanner -u john_doe -c dev # developer platforms only +user-scanner -u john_doe -m github # only Github ``` -Also, the output file and format can be specified:
+List all available modules/categories: ```bash -user-scanner -u -f console #Default format -user-scanner -u -f csv -user-scanner -u -f json -user-scanner -u -f -o +user-scanner -l +user-scanner --list # long version ``` -Generate multiple username variations by appending a suffix: +### Username/Email variations (only suffix) + +Generate & check usernames variations with a permutation from the given suffix: ```bash -user-scanner -u -p +user-scanner -u john_ -p ab #john_a, ..., john_ab, john_ba ``` -Optionally, scan a specific category or single module with limit: + +Control variation generation: ```bash -user-scanner -u -p -c dev -user-scanner -u -p -m github -user-scanner -u -p -s # limit generation of usernames -user-scanner -u -p -d # delay to avoid rate-limits (can be 0s-1s) +-s 30 # How many variations to generate +-d 1.0 #Delay between username checking ``` + +### Output control + +```bash +-f csv +-f json +-o result.json # output to file +``` + --- ### Update @@ -84,12 +100,11 @@ Update the tool to the latest PyPI version: ```bash user-scanner -U - ``` --- -### Screenshot: +## Screenshot: - Note*: New modules are constantly getting added so this might have only limited, outdated output: @@ -105,8 +120,9 @@ user-scanner -U user-scanner's JSON output screenshot +--- -### Contributing: +## Contributing: Modules are organized by category: @@ -134,20 +150,20 @@ See [CONTRIBUTING.md](CONTRIBUTING.md) for examples. --- -### Dependencies: +## Dependencies: - [httpx](https://pypi.org/project/httpx/) - [colorama](https://pypi.org/project/colorama/) --- -### License +## License This project is licensed under the **MIT License**. See [LICENSE](LICENSE) for details. --- -### Star History +## Star History From ee993fc51ff5f9384912f4b596821254ddeee48c Mon Sep 17 00:00:00 2001 From: Kaif Date: Mon, 12 Jan 2026 18:33:21 +0530 Subject: [PATCH 27/30] update(README.md): add instructions about new email scan mode --- README.md | 96 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 2d91e72..df0af23 100644 --- a/README.md +++ b/README.md @@ -12,23 +12,27 @@ --- -Scan a username or email across multiple social, developer, and creator platforms to see if it’s available. -Perfect for finding a **unique username or email** across GitHub, Twitter, Reddit, Instagram, and more, all in one command. +A powerful *Email OSINT tool* that checks if a specific email is registered on various sites, combined with *username scanning* — 2-in-1 solution. +Perfect for fast, accurate and lightweight email OSINT + +Perfect for finding a **unique username** across GitHub, Twitter, Reddit, Instagram, and more, all in a single command. ## Features -- ✅ Check usernames across **social networks**, **developer platforms**, and **creator communities** -- ✅ Can be used as a username and email OSINT tool -- ✅ Smart auto-update system detects new releases on PyPI and interactively prompts the user to upgrade. -- ✅ Clear **Available / Taken / Error** output for each platform -- ✅ Robust error handling: It prints the exact reason (e.g. Cannot use underscores, hyphens at the start/end) -- ✅ Fully modular: add new platform modules easily -- ✅ Wildcard-based username permutations for automatic variation generation using provided suffix -- ✅ Selection of results format (e.g. json, csv, console (default)) -- ✅ Get the scanning results in preferred format (json/csv) in specified output file (suitable for power users) -- ✅ Command-line interface ready: works directly after `pip install` -- ✅ Very low and lightweight dependencies, can be run on any machine +- ✅ Check an email across multiple sites to see if it’s registered. +- ✅ Scan usernames across **social networks**, **developer platforms**, **creator communities**, and more. +- ✅ Can be used purely as a username tool. +- ✅ Smart auto-update system detects new releases on PyPI and prompts the user to upgrade interactively. +- ✅ Clear `Registered` and `Not Registered` for email scanning `Available` / `Taken` / `Error` output for username scans +- ✅ Robust error handling: displays the exact reason a username or email cannot be used (e.g., underscores or hyphens at the start/end). +- ✅ Fully modular: easily add new platform modules. +- ✅ Wildcard-based username permutations for automatic variation generation using a provided suffix. +- ✅ Option to select results format (**JSON**, **CSV**, console). +- ✅ Save scanning and OSINT results in the preferred format and output file (ideal for power users). +- ✅ Command-line interface ready: works immediately after `pip install`. +- ✅ Lightweight with minimal dependencies; runs on any machine. + --- ## Installation @@ -43,54 +47,54 @@ pip install user-scanner ### Basic username/email scan -Scan a signle username across **all** available modules/platforms: +Scan a single username across **all** available modules/platforms: ```bash +user-scanner -e john_doe@gmail.com +user-scanner --email john_doe@gmail.com # long version + user-scanner -u john_doe user-scanner --username john_doe # long version -user-scanner -e john_doe@gmail.com -user-scanner --email john_doe@gmail.com # long version ``` ### Selective scanning -Scan only specific category or single modules: +Scan only specific categories or single modules: ```bash user-scanner -u john_doe -c dev # developer platforms only -user-scanner -u john_doe -m github # only Github +user-scanner -u john_doe -m github # only GitHub + ``` List all available modules/categories: ```bash user-scanner -l -user-scanner --list # long version -``` - -### Username/Email variations (only suffix) -Generate & check usernames variations with a permutation from the given suffix: - -```bash -user-scanner -u john_ -p ab #john_a, ..., john_ab, john_ba ``` -Control variation generation: +### Username/Email variations (suffix only) + +Generate & check username variations using a permutation from the given suffix: ```bash --s 30 # How many variations to generate --d 1.0 #Delay between username checking +user-scanner -u john_ -p ab # john_a, ..., john_ab, john_ba ``` -### Output control +## Important Flags -```bash --f csv --f json --o result.json # output to file -``` +| Flag | Description | +|------|-------------| +| `-c, --category CATEGORY` | Scan all platforms in a specific category | +| `-l, --list` | List all available modules for username scanning | +| `-m, --module MODULE` | Scan a single specific module | +| `-p, --permute PERMUTE` | Generate username permutations using a pattern/suffix | +| `-s, --stop STOP` | Limit the number of permutations generated | +| `-d, --delay DELAY` | Delay (in seconds) between requests | +| `-f, --format {csv,json}` | Select output format | +| `-o, --output OUTPUT` | Save results to a file | --- @@ -101,7 +105,6 @@ Update the tool to the latest PyPI version: ```bash user-scanner -U ``` - --- ## Screenshot: @@ -122,18 +125,23 @@ user-scanner -U --- -## Contributing: +## Contributing -Modules are organized by category: +Modules are organized under `user_scanner/`: ``` user_scanner/ -├── dev/ # Developer platforms (GitHub, GitLab, etc.) -├── social/ # Social platforms (Twitter/X, Reddit, Instagram, etc.) -├── creator/ # Creator platforms (Hashnode, Dev.to, Medium, etc.) -├── community/ # Community platforms (forums, niche sites) -├── gaming/ # Gaming sites (chess.com, roblox, monkeytype etc.) -├── donation/ # Donation taking sites (buymeacoffe.com, similar...) +├── email_scan/ # Currently in development +│ └── social/ # Social email scan modules (Instagram, Mastodon, X, etc.) +| ... # New sites to be added soon +├── user_scan/ +│ ├── dev/ # Developer platforms (GitHub, GitLab, npm, etc.) +│ ├── social/ # Social platforms (Twitter/X, Reddit, Instagram, Discord, etc.) +│ ├── creator/ # Creator platforms (Hashnode, Dev.to, Medium, Patreon, etc.) +│ ├── community/ # Community platforms (forums, StackOverflow, HackerNews, etc.) +│ ├── gaming/ # Gaming sites (chess.com, Lichess, Roblox, Minecraft, etc.) +│ └── donation/ # Donation platforms (BuyMeACoffee, Liberapay) +|... ``` **Module guidelines:** From 54b5a8fc248d3cddc18f63cbfa15522489cdd25b Mon Sep 17 00:00:00 2001 From: json-hunter07 Date: Sun, 18 Jan 2026 00:37:52 +0530 Subject: [PATCH 28/30] add: new `porn` category and email validation function for Pornhub --- user_scanner/email_scan/__init__.py | 0 user_scanner/email_scan/porn/pornhub.py | 62 ++++++++++++++++++++++ user_scanner/email_scan/social/__init__.py | 0 3 files changed, 62 insertions(+) create mode 100644 user_scanner/email_scan/__init__.py create mode 100644 user_scanner/email_scan/porn/pornhub.py create mode 100644 user_scanner/email_scan/social/__init__.py diff --git a/user_scanner/email_scan/__init__.py b/user_scanner/email_scan/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/user_scanner/email_scan/porn/pornhub.py b/user_scanner/email_scan/porn/pornhub.py new file mode 100644 index 0000000..33c8b14 --- /dev/null +++ b/user_scanner/email_scan/porn/pornhub.py @@ -0,0 +1,62 @@ +import httpx +import re +from user_scanner.core.result import Result + +async def _check(email: str) -> Result: + base_url = "https://www.pornhub.com" + check_api = f"{base_url}/api/v1/user/create_account_check" + + headers = { + "user-agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Mobile Safari/537.36", + "x-requested-with": "XMLHttpRequest", + "origin": base_url, + "referer": base_url + "/", + "content-type": "application/x-www-form-urlencoded; charset=UTF-8" + } + + async with httpx.AsyncClient(http2=True, follow_redirects=True, timeout=3) as client: + try: + landing_resp = await client.get(base_url, headers=headers) + token_match = re.search(r'var\s+token\s*=\s*"([^"]+)"', landing_resp.text) + + if not token_match: + return Result.error("Failed to extract dynamic token from HTML") + + token = token_match.group(1) + + params = {"token": token} + payload = { + "check_what": "email", + "email": email + } + + response = await client.post( + check_api, + params=params, + headers=headers, + data=payload + ) + + if response.status_code == 429: + return Result.error("Rate limited, wait for a few minutes") + + if response.status_code != 200: + return Result.error(f"HTTP Error: {response.status_code}") + + data = response.json() + status = data.get("email") + error_msg = data.get("error_message", "") + + if status == "create_account_passed": + return Result.available() + elif "already in use" in error_msg.lower() or status != "create_account_passed": + return Result.taken() + else: + return Result.error(f"Unexpected API response: {status}") + + except Exception as e: + return Result.error(e) + + +async def validate_pornhub(email: str) -> Result: + return await _check(email) diff --git a/user_scanner/email_scan/social/__init__.py b/user_scanner/email_scan/social/__init__.py new file mode 100644 index 0000000..e69de29 From a6c3044f533ccc531ffcf8d66d7faca8778b5673 Mon Sep 17 00:00:00 2001 From: json-hunter07 Date: Sun, 18 Jan 2026 01:12:30 +0530 Subject: [PATCH 29/30] fix(social): convert social modules' functions to async --- user_scanner/email_scan/social/instagram.py | 4 ++-- user_scanner/email_scan/social/mastodon.py | 4 ++-- user_scanner/email_scan/social/x.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/user_scanner/email_scan/social/instagram.py b/user_scanner/email_scan/social/instagram.py index 698ae38..749b05b 100644 --- a/user_scanner/email_scan/social/instagram.py +++ b/user_scanner/email_scan/social/instagram.py @@ -44,5 +44,5 @@ async def _check(email): -def validate_instagram(email: str) -> Result: - return _check(email) +async def validate_instagram(email: str) -> Result: + return await _check(email) diff --git a/user_scanner/email_scan/social/mastodon.py b/user_scanner/email_scan/social/mastodon.py index 0685438..6084339 100644 --- a/user_scanner/email_scan/social/mastodon.py +++ b/user_scanner/email_scan/social/mastodon.py @@ -49,5 +49,5 @@ async def _check(email): return Result.error(e) -def validate_mastodon(email: str) -> Result: - return _check(email) +async def validate_mastodon(email: str) -> Result: + return await _check(email) diff --git a/user_scanner/email_scan/social/x.py b/user_scanner/email_scan/social/x.py index 9033a1e..8a6dd94 100644 --- a/user_scanner/email_scan/social/x.py +++ b/user_scanner/email_scan/social/x.py @@ -35,5 +35,5 @@ async def _check(email): return Result.error(e) -def validate_x(email: str) -> Result: - return _check(email) +async def validate_x(email: str) -> Result: + return await _check(email) From 6f4449f9e3fbada26c9c16e4bc87fdeb17b7afd9 Mon Sep 17 00:00:00 2001 From: Jason Hunter Date: Sun, 18 Jan 2026 23:53:34 +0530 Subject: [PATCH 30/30] update(CONTRIBUTING.md): Enhance with detailed guidelines as per #125 Expanded contributing guidelines with module naming conventions, best practices for email modules, and validator function requirements. --- CONTRIBUTING.md | 164 +++++++++++++++++++++++++++++------------------- 1 file changed, 99 insertions(+), 65 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8197c87..6367731 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,55 +1,113 @@ # Contributing to user-scanner -Thanks for contributing! This guide explains how to add or modify platform validators correctly, and it includes the orchestrator helpers (generic_validate and status_validate) used to keep validators small and consistent. - --- -## Overview +This project separates two kinds of checks: + +- Username availability checks (under `user_scanner/user_scan/*`) — synchronous validators that the main username scanner uses. +- Email OSINT checks (under `user_scanner/email_scan/`) — asynchronous, multi-step flows that probe signup pages or email-focused APIs. Put email-focused modules in `user_scanner/email_scan/` (subfolders like `social/`, `dev/`, `community`, `creator` etc. are fine — follow the existing tree). + -This project contains small "validator" modules that check whether a username exists on a given platform. Each validator is a single function that returns a Result object (see core/orchestrator.py). +--- -Result semantics: -- Result.available() → available -- Result.taken() → taken -- Result.error(message: Optional[str]) → error, blocked, unknown, or request failure (include short diagnostic message when helpful) +## Module naming for both `email_scan` and `user_scan` modules -Follow this document when adding or updating validators. +- File name must be the platform name in lowercase (no spaces or special characters). + - Examples: `github.py`, `reddit.py`, `x.py`, `pinterest.py` --- -## Folder structure -- `social/` -> Social media platforms (Instagram, Reddit, X, etc.) -- `dev/` -> Developer platforms (GitHub, GitLab, Kaggle, etc.) -- `community/` -> Miscellaneous or community-specific platforms -- Add new directories for new categories as needed. +## Email-scan (email_scan) — guide for contributors -Example: -``` -user_scanner/ -├── social/ -| └── reddit.py -| ... -├── dev/ -| └── launchpad.py -| ... -└── core/ - └── orchestrator.py - ... -``` +Minimal best-practices checklist for email modules +- [ ] Put file in `user_scanner/email_scan//service.py`. +- [ ] Export `async def validate_(email: str) -> Result`. +- [ ] Use `httpx.AsyncClient` for requests, with sensible timeouts and follow_redirects when needed. +- [ ] Add a short docstring describing environment variables (api keys), rate limits, and responsible-use note (if required) -Place each new module in the most relevant folder. +### Example: Mastodon async example: ---- +```python name=user_scanner/email_scan/social/mastodon.py +import httpx +import re +from user_scanner.core.result import Result -## Module naming -- File name must be the platform name in lowercase (no spaces or special characters). - - Examples: `github.py`, `reddit.py`, `x.py`, `pinterest.py` +async def _check(email: str) -> Result: + """ + Internal helper that performs the multi-step signup probe. + Returns: Result.available(), Result.taken(), or Result.error(msg) + """ + base_url = "https://mastodon.social" + signup_url = f"{base_url}/auth/sign_up" + post_url = f"{base_url}/auth" + + headers = { + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", + "referer": "https://mastodon.social/explore", + "origin": "https://mastodon.social", + } + + async with httpx.AsyncClient(http2=True, headers=headers, follow_redirects=True) as client: + try: + initial_resp = await client.get(signup_url, timeout=15.0) + if initial_resp.status_code != 200: + return Result.error(f"Failed to access signup page: {initial_resp.status_code}") + + # Look for CSRF/auth token in the signup HTML + token_match = re.search(r'name="csrf-token" content="([^"]+)"', initial_resp.text) + if not token_match: + return Result.error("Could not find authenticity token") + + csrf_token = token_match.group(1) + + # Use a dummy username & password for the signup probe + payload = { + "authenticity_token": csrf_token, + "user[account_attributes][username]": "no3motions_robot_020102", + "user[email]": email, + "user[password]": "Theleftalone@me", + "user[password_confirmation]": "Theleftalone@me", + "user[agreement]": "1", + "button": "" + } + + response = await client.post(post_url, data=payload, timeout=15.0) + + # Check the response HTML for the "already taken" phrase + if "has already been taken" in response.text: + return Result.taken() + else: + # If the response does not show taken, treat as available. + # Be aware: services can change wording; prefer explicit checks. + return Result.available() + + except Exception as exc: + # Convert exception to string so Result.error is stable/serializable + return Result.error(str(exc)) + + +async def validate_mastodon(email: str) -> Result: + """ + Public validator used by the email mode. + - Do basic local validation before network calls. + - Return Result.* helpers described above. + """ + if not EMAIL_RE.match(email): + return Result.error("Invalid email format") + return await _check(email) +``` + --- -## Validator function +## Username availability check guide: + + +### Validator function (user_scan/) Each module must expose exactly one validator function named: @@ -66,7 +124,7 @@ Rules: --- -## Orchestrator helpers +## Orchestrator helpers (user_scan) To keep validators DRY, the repository provides helper functions in `core/orchestrator.py`. Use these where appropriate. @@ -177,19 +235,17 @@ Note: The exact parameter names and behavior of the orchestrator functions are d - When providing headers, include a User-Agent and reasonable Accept headers. - Timeouts should be reasonable (3–10 seconds). The orchestrator will usually expose a timeout parameter. -Example common headers: -```py - headers = { - 'User-Agent': "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Mobile Safari/537.36", - 'Accept': "text/html,application/xhtml+xml,application/xml;q=0.9", - 'Accept-Encoding': "gzip, deflate, br, zstd", - 'Upgrade-Insecure-Requests': "1", - } +--- -``` +## When to implement custom logic (user_scan) + +- Use `status_validate` when availability is determined by HTTP status codes (e.g., 404 vs 200). +- Use `generic_validate` when you need to inspect response content or headers and decide availability via a short callback that returns a Result. +- If a platform requires API keys, OAuth, or heavy JS rendering, document it in the PR and consider an "advanced" module that can be enabled separately. --- + ## Return values and error handling - Always return a Result object: @@ -219,26 +275,4 @@ except Exception: --- -## Pull request checklist - -Before opening a PR: -- [x] Add the new validator file in the appropriate folder. -- [x] Prefer using `generic_validate` or `status_validate` where applicable. -- [x] Ensure imports are valid and package can be imported. - -When opening the PR: -- Describe the approach, any heuristics used, and potential edge cases. -- If the platform has rate limits or anti-bot measures, note them and recommend a testing approach. -- If you return Result.error with a message, include why and any reproducible steps if available. - ---- - -## When to implement custom logic - -- Use `status_validate` when availability is determined by HTTP status codes (e.g., 404 vs 200). -- Use `generic_validate` when you need to inspect response content or headers and decide availability via a short callback that returns a Result. -- If a platform requires API keys, OAuth, or heavy JS rendering, document it in the PR and consider an "advanced" module that can be enabled separately. - ---- - Thank you for contributing!