diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8197c87..6367731 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,55 +1,113 @@ # Contributing to user-scanner -Thanks for contributing! This guide explains how to add or modify platform validators correctly, and it includes the orchestrator helpers (generic_validate and status_validate) used to keep validators small and consistent. - --- -## Overview +This project separates two kinds of checks: + +- Username availability checks (under `user_scanner/user_scan/*`) — synchronous validators that the main username scanner uses. +- Email OSINT checks (under `user_scanner/email_scan/`) — asynchronous, multi-step flows that probe signup pages or email-focused APIs. Put email-focused modules in `user_scanner/email_scan/` (subfolders like `social/`, `dev/`, `community`, `creator` etc. are fine — follow the existing tree). + -This project contains small "validator" modules that check whether a username exists on a given platform. Each validator is a single function that returns a Result object (see core/orchestrator.py). +--- -Result semantics: -- Result.available() → available -- Result.taken() → taken -- Result.error(message: Optional[str]) → error, blocked, unknown, or request failure (include short diagnostic message when helpful) +## Module naming for both `email_scan` and `user_scan` modules -Follow this document when adding or updating validators. +- File name must be the platform name in lowercase (no spaces or special characters). + - Examples: `github.py`, `reddit.py`, `x.py`, `pinterest.py` --- -## Folder structure -- `social/` -> Social media platforms (Instagram, Reddit, X, etc.) -- `dev/` -> Developer platforms (GitHub, GitLab, Kaggle, etc.) -- `community/` -> Miscellaneous or community-specific platforms -- Add new directories for new categories as needed. +## Email-scan (email_scan) — guide for contributors -Example: -``` -user_scanner/ -├── social/ -| └── reddit.py -| ... -├── dev/ -| └── launchpad.py -| ... -└── core/ - └── orchestrator.py - ... -``` +Minimal best-practices checklist for email modules +- [ ] Put file in `user_scanner/email_scan//service.py`. +- [ ] Export `async def validate_(email: str) -> Result`. +- [ ] Use `httpx.AsyncClient` for requests, with sensible timeouts and follow_redirects when needed. +- [ ] Add a short docstring describing environment variables (api keys), rate limits, and responsible-use note (if required) -Place each new module in the most relevant folder. +### Example: Mastodon async example: ---- +```python name=user_scanner/email_scan/social/mastodon.py +import httpx +import re +from user_scanner.core.result import Result -## Module naming -- File name must be the platform name in lowercase (no spaces or special characters). - - Examples: `github.py`, `reddit.py`, `x.py`, `pinterest.py` +async def _check(email: str) -> Result: + """ + Internal helper that performs the multi-step signup probe. + Returns: Result.available(), Result.taken(), or Result.error(msg) + """ + base_url = "https://mastodon.social" + signup_url = f"{base_url}/auth/sign_up" + post_url = f"{base_url}/auth" + + headers = { + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", + "referer": "https://mastodon.social/explore", + "origin": "https://mastodon.social", + } + + async with httpx.AsyncClient(http2=True, headers=headers, follow_redirects=True) as client: + try: + initial_resp = await client.get(signup_url, timeout=15.0) + if initial_resp.status_code != 200: + return Result.error(f"Failed to access signup page: {initial_resp.status_code}") + + # Look for CSRF/auth token in the signup HTML + token_match = re.search(r'name="csrf-token" content="([^"]+)"', initial_resp.text) + if not token_match: + return Result.error("Could not find authenticity token") + + csrf_token = token_match.group(1) + + # Use a dummy username & password for the signup probe + payload = { + "authenticity_token": csrf_token, + "user[account_attributes][username]": "no3motions_robot_020102", + "user[email]": email, + "user[password]": "Theleftalone@me", + "user[password_confirmation]": "Theleftalone@me", + "user[agreement]": "1", + "button": "" + } + + response = await client.post(post_url, data=payload, timeout=15.0) + + # Check the response HTML for the "already taken" phrase + if "has already been taken" in response.text: + return Result.taken() + else: + # If the response does not show taken, treat as available. + # Be aware: services can change wording; prefer explicit checks. + return Result.available() + + except Exception as exc: + # Convert exception to string so Result.error is stable/serializable + return Result.error(str(exc)) + + +async def validate_mastodon(email: str) -> Result: + """ + Public validator used by the email mode. + - Do basic local validation before network calls. + - Return Result.* helpers described above. + """ + if not EMAIL_RE.match(email): + return Result.error("Invalid email format") + return await _check(email) +``` + --- -## Validator function +## Username availability check guide: + + +### Validator function (user_scan/) Each module must expose exactly one validator function named: @@ -66,7 +124,7 @@ Rules: --- -## Orchestrator helpers +## Orchestrator helpers (user_scan) To keep validators DRY, the repository provides helper functions in `core/orchestrator.py`. Use these where appropriate. @@ -177,19 +235,17 @@ Note: The exact parameter names and behavior of the orchestrator functions are d - When providing headers, include a User-Agent and reasonable Accept headers. - Timeouts should be reasonable (3–10 seconds). The orchestrator will usually expose a timeout parameter. -Example common headers: -```py - headers = { - 'User-Agent': "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Mobile Safari/537.36", - 'Accept': "text/html,application/xhtml+xml,application/xml;q=0.9", - 'Accept-Encoding': "gzip, deflate, br, zstd", - 'Upgrade-Insecure-Requests': "1", - } +--- -``` +## When to implement custom logic (user_scan) + +- Use `status_validate` when availability is determined by HTTP status codes (e.g., 404 vs 200). +- Use `generic_validate` when you need to inspect response content or headers and decide availability via a short callback that returns a Result. +- If a platform requires API keys, OAuth, or heavy JS rendering, document it in the PR and consider an "advanced" module that can be enabled separately. --- + ## Return values and error handling - Always return a Result object: @@ -219,26 +275,4 @@ except Exception: --- -## Pull request checklist - -Before opening a PR: -- [x] Add the new validator file in the appropriate folder. -- [x] Prefer using `generic_validate` or `status_validate` where applicable. -- [x] Ensure imports are valid and package can be imported. - -When opening the PR: -- Describe the approach, any heuristics used, and potential edge cases. -- If the platform has rate limits or anti-bot measures, note them and recommend a testing approach. -- If you return Result.error with a message, include why and any reproducible steps if available. - ---- - -## When to implement custom logic - -- Use `status_validate` when availability is determined by HTTP status codes (e.g., 404 vs 200). -- Use `generic_validate` when you need to inspect response content or headers and decide availability via a short callback that returns a Result. -- If a platform requires API keys, OAuth, or heavy JS rendering, document it in the PR and consider an "advanced" module that can be enabled separately. - ---- - Thank you for contributing! diff --git a/README.md b/README.md index 688ec63..df0af23 100644 --- a/README.md +++ b/README.md @@ -12,26 +12,30 @@ --- -Scan a username across multiple social, developer, creator, gaming etc. platforms to see if it’s available. -Perfect for finding a **unique username** across GitHub, Twitter, Reddit, Instagram, and more, all in one command. - - -### Features - -- ✅ Check usernames across **social networks**, **developer platforms**, and **creator communities** -- ✅ Can be used as a username OSINT tool -- ✅ Smart auto-update system, Detects new releases on PyPI and interactively prompts the user to upgrade. -- ✅ Clear **Available / Taken / Error** output for each platform -- ✅ Robust error handling: It prints the exact reason (e.g. Cannot use underscores, hyphens at the start/end) -- ✅ Fully modular: add new platform modules easily -- ✅ Wildcard-based username permutations for automatic variation generation using provided suffix -- ✅ Selection of results format (e.g. json, csv, console (default)) -- ✅ Get the scanning results in preferred format (json/csv) in specified output file (suitable for power users) -- ✅ Command-line interface ready: works directly after `pip install` -- ✅ Very low and lightweight dependencies, can be run on any machine +A powerful *Email OSINT tool* that checks if a specific email is registered on various sites, combined with *username scanning* — 2-in-1 solution. + +Perfect for fast, accurate and lightweight email OSINT + +Perfect for finding a **unique username** across GitHub, Twitter, Reddit, Instagram, and more, all in a single command. + +## Features + +- ✅ Check an email across multiple sites to see if it’s registered. +- ✅ Scan usernames across **social networks**, **developer platforms**, **creator communities**, and more. +- ✅ Can be used purely as a username tool. +- ✅ Smart auto-update system detects new releases on PyPI and prompts the user to upgrade interactively. +- ✅ Clear `Registered` and `Not Registered` for email scanning `Available` / `Taken` / `Error` output for username scans +- ✅ Robust error handling: displays the exact reason a username or email cannot be used (e.g., underscores or hyphens at the start/end). +- ✅ Fully modular: easily add new platform modules. +- ✅ Wildcard-based username permutations for automatic variation generation using a provided suffix. +- ✅ Option to select results format (**JSON**, **CSV**, console). +- ✅ Save scanning and OSINT results in the preferred format and output file (ideal for power users). +- ✅ Command-line interface ready: works immediately after `pip install`. +- ✅ Lightweight with minimal dependencies; runs on any machine. + --- -### Installation +## Installation ```bash pip install user-scanner @@ -39,43 +43,59 @@ pip install user-scanner --- -### Usage +## Usage -Scan a username across all platforms: +### Basic username/email scan -```bash -user-scanner -u -``` -Optionally, scan a specific category or single module: +Scan a single username across **all** available modules/platforms: ```bash -user-scanner -u -c dev -user-scanner -l # Lists all available modules -user-scanner -u -m github +user-scanner -e john_doe@gmail.com +user-scanner --email john_doe@gmail.com # long version + +user-scanner -u john_doe +user-scanner --username john_doe # long version + ``` -Also, the output file and format can be specified:
+### Selective scanning + +Scan only specific categories or single modules: ```bash -user-scanner -u -f console #Default format -user-scanner -u -f csv -user-scanner -u -f json -user-scanner -u -f -o +user-scanner -u john_doe -c dev # developer platforms only +user-scanner -u john_doe -m github # only GitHub + ``` -Generate multiple username variations by appending a suffix: +List all available modules/categories: ```bash -user-scanner -u -p +user-scanner -l + ``` -Optionally, scan a specific category or single module with limit: + +### Username/Email variations (suffix only) + +Generate & check username variations using a permutation from the given suffix: ```bash -user-scanner -u -p -c dev -user-scanner -u -p -m github -user-scanner -u -p -s # limit generation of usernames -user-scanner -u -p -d # delay to avoid rate-limits (can be 0s-1s) +user-scanner -u john_ -p ab # john_a, ..., john_ab, john_ba ``` + +## Important Flags + +| Flag | Description | +|------|-------------| +| `-c, --category CATEGORY` | Scan all platforms in a specific category | +| `-l, --list` | List all available modules for username scanning | +| `-m, --module MODULE` | Scan a single specific module | +| `-p, --permute PERMUTE` | Generate username permutations using a pattern/suffix | +| `-s, --stop STOP` | Limit the number of permutations generated | +| `-d, --delay DELAY` | Delay (in seconds) between requests | +| `-f, --format {csv,json}` | Select output format | +| `-o, --output OUTPUT` | Save results to a file | + --- ### Update @@ -84,12 +104,10 @@ Update the tool to the latest PyPI version: ```bash user-scanner -U - ``` - --- -### Screenshot: +## Screenshot: - Note*: New modules are constantly getting added so this might have only limited, outdated output: @@ -105,19 +123,25 @@ user-scanner -U user-scanner's JSON output screenshot +--- -### Contributing: +## Contributing -Modules are organized by category: +Modules are organized under `user_scanner/`: ``` user_scanner/ -├── dev/ # Developer platforms (GitHub, GitLab, etc.) -├── social/ # Social platforms (Twitter/X, Reddit, Instagram, etc.) -├── creator/ # Creator platforms (Hashnode, Dev.to, Medium, etc.) -├── community/ # Community platforms (forums, niche sites) -├── gaming/ # Gaming sites (chess.com, roblox, monkeytype etc.) -├── donation/ # Donation taking sites (buymeacoffe.com, similar...) +├── email_scan/ # Currently in development +│ └── social/ # Social email scan modules (Instagram, Mastodon, X, etc.) +| ... # New sites to be added soon +├── user_scan/ +│ ├── dev/ # Developer platforms (GitHub, GitLab, npm, etc.) +│ ├── social/ # Social platforms (Twitter/X, Reddit, Instagram, Discord, etc.) +│ ├── creator/ # Creator platforms (Hashnode, Dev.to, Medium, Patreon, etc.) +│ ├── community/ # Community platforms (forums, StackOverflow, HackerNews, etc.) +│ ├── gaming/ # Gaming sites (chess.com, Lichess, Roblox, Minecraft, etc.) +│ └── donation/ # Donation platforms (BuyMeACoffee, Liberapay) +|... ``` **Module guidelines:** @@ -134,20 +158,20 @@ See [CONTRIBUTING.md](CONTRIBUTING.md) for examples. --- -### Dependencies: +## Dependencies: - [httpx](https://pypi.org/project/httpx/) - [colorama](https://pypi.org/project/colorama/) --- -### License +## License This project is licensed under the **MIT License**. See [LICENSE](LICENSE) for details. --- -### Star History +## Star History diff --git a/requirements.txt b/requirements.txt index 0bb9a42..016a6bc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -httpx +httpx[http2] colorama diff --git a/tests/test_formatter.py b/tests/test_formatter.py new file mode 100644 index 0000000..c023eee --- /dev/null +++ b/tests/test_formatter.py @@ -0,0 +1,35 @@ +from user_scanner.core.formatter import into_csv, into_json, indentate, INDENT +from user_scanner.core.result import Result + + +def test_indentate(): + assert indentate("", -1) == "" + assert indentate("", 0) == "" + assert indentate("", 2) == 2 * INDENT + + msg = ("This is a test message\n" + "made to test the identation\n" + "and shouldn't be changed.") + + for i in range(0, 4): + new = indentate(msg, i) + for line in new.split("\n"): + assert line.find(INDENT * i) == 0 + + +def test_get_result_output_formats(): + res = Result.available( + username="alice", site_name="ExampleSite", category="Cat") + + out_console = res.get_console_output() + assert "Available" in out_console + assert "ExampleSite" in out_console + assert "alice" in out_console + + out_json = into_json([res]) + assert '"username": "alice"' in out_json + assert '"site_name": "ExampleSite"' in out_json + + out_csv = into_csv([res]) + assert "alice" in out_csv + assert "ExampleSite" in out_csv diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..bd46eec --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,34 @@ +from user_scanner.core import helpers +from types import SimpleNamespace + +def test_generate_permutations(): + perms = helpers.generate_permutations("user", "ab", limit=None) + assert "user" in perms + # All permutations must be valid + assert all( + p == "user" or + (p.startswith("user") and len(p) > len("user")) + for p in perms + ) + + assert len(perms) > 1 + +def test_generate_permutations_email(): + perms = helpers.generate_permutations("john@email.com", "abc", limit=None, is_email=True) + assert "john@email.com" in perms + assert all( + p == "john@email.com" or + (p.startswith("john") and len(p) > len("john@email.com") and p.endswith("@email.com")) + for p in perms + ) + assert len(perms) > 1 + +def test_get_site_name(): + def module(name:str) -> SimpleNamespace: + return SimpleNamespace(**{"__name__":name}) + assert helpers.get_site_name(module("X")) == "X (Twitter)" + assert helpers.get_site_name(module("x")) == "X (Twitter)" + assert helpers.get_site_name(module("user_scanner.github")) == "Github" + assert helpers.get_site_name(module("user_scanner.chess_com")) == "Chess.com" + + diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index c04afe2..7b1b3c4 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -1,7 +1,6 @@ import types from types import SimpleNamespace from user_scanner.core import orchestrator -from user_scanner.cli.printer import Printer from user_scanner.core.result import Result @@ -13,18 +12,6 @@ def test_status_validate_available(monkeypatch): assert res.to_number() == 1 # AVAILABLE -def test_generate_permutations(): - perms = orchestrator.generate_permutations("user", "ab", limit=None) - assert "user" in perms - # All permutations must be valid - assert all( - p == "user" or - (p.startswith("user") and len(p) > len("user")) - for p in perms - ) - - assert len(perms) > 1 - def test_run_module_single_prints_json_and_csv(capsys): module = types.ModuleType("fake.testsite") module.__file__ = "/fake/testsite.py" @@ -34,16 +21,10 @@ def validate_testsite(username): setattr(module, "validate_testsite", validate_testsite) - p_json = Printer("json") - orchestrator.run_module_single(module, "bob", p_json, last=True) + orchestrator.run_user_module(module, "bob") out = capsys.readouterr().out - assert '"username": "bob"' in out + assert 'bob' in out #Needs to be improved - p_csv = Printer("csv") - orchestrator.run_module_single(module, "bob", p_csv, last=True) - out2 = capsys.readouterr().out - assert "bob" in out2 - assert "Testsite" in out2 or "testsite" in out2 def test_run_checks_category_threaded(monkeypatch, tmp_path): @@ -60,8 +41,7 @@ def validate(username): monkeypatch.setattr(orchestrator, "load_modules", lambda p: [module]) monkeypatch.setattr(orchestrator, "get_site_name", lambda m: "Testsite") - p = Printer("console") - results = orchestrator.run_checks_category(tmp_path, "someone", p, last=True) + results = orchestrator.run_user_category(tmp_path, "someone") assert isinstance(results, list) assert len(results) == 1 assert results[0].to_number() == 0 # TAKEN diff --git a/tests/test_printer.py b/tests/test_printer.py deleted file mode 100644 index ad398e7..0000000 --- a/tests/test_printer.py +++ /dev/null @@ -1,64 +0,0 @@ -from user_scanner.cli.printer import Printer, indentate, INDENT -from user_scanner.core.result import Result -import pytest - - -def test_indentate(): - assert indentate("", -1) == "" - assert indentate("", 0) == "" - assert indentate("", 2) == 2 * INDENT - - msg = ("This is a test message\n" - "made to test the identation\n" - "and shouldn't be changed.") - - for i in range(0, 4): - new = indentate(msg, i) - for line in new.split("\n"): - assert line.find(INDENT * i) == 0 - - -def test_creation(): - assert Printer("console").mode == "console" - assert Printer("csv").mode == "csv" - assert Printer("json").mode == "json" - - with pytest.raises(ValueError): - Printer(2) - - -def test_is_properties(): - p = Printer("console") - assert p.is_console is True - assert p.is_csv is False - assert p.is_json is False - - p = Printer("csv") - assert p.is_console is False - assert p.is_csv is True - assert p.is_json is False - - p = Printer("json") - assert p.is_console is False - assert p.is_csv is False - assert p.is_json is True - - -def test_get_result_output_formats(): - res = Result.available(username="alice", site_name="ExampleSite", category="Cat") - - p_console = Printer("console") - out_console = p_console.get_result_output(res) - assert "Available" in out_console - assert "ExampleSite" in out_console - assert "alice" in out_console - - p_json = Printer("json") - out_json = p_json.get_result_output(res) - assert '"username": "alice"' in out_json - assert '"site_name": "ExampleSite"' in out_json - - p_csv = Printer("csv") - out_csv = p_csv.get_result_output(res) - assert "alice" in out_csv - assert "ExampleSite" in out_csv diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index 4464808..127ee25 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -1,16 +1,35 @@ import argparse import time import sys -from user_scanner.cli import printer -from user_scanner.core.orchestrator import generate_permutations, load_categories +import re from colorama import Fore, Style + from user_scanner.cli.banner import print_banner -from typing import List -from user_scanner.core.result import Result -from user_scanner.core.helpers import is_last_value +from user_scanner.core.version import load_local_version +from user_scanner.core import formatter from user_scanner.utils.updater_logic import check_for_updates from user_scanner.utils.update import update_self +from user_scanner.core.helpers import ( + load_categories, + load_modules, + find_module, + get_site_name, + generate_permutations +) + +from user_scanner.core.orchestrator import ( + run_user_full, + run_user_category, + run_user_module +) + +from user_scanner.core.email_orchestrator import ( + run_email_full_batch, + run_email_category_batch, + run_email_module_batch +) + # Color configs R = Fore.RED G = Fore.GREEN @@ -18,163 +37,139 @@ Y = Fore.YELLOW X = Fore.RESET - -MAX_PERMUTATIONS_LIMIT = 100 # To prevent excessive generation +MAX_PERMUTATIONS_LIMIT = 100 def main(): - parser = argparse.ArgumentParser( prog="user-scanner", - description="Scan usernames across multiple platforms." - ) - parser.add_argument( - "-u", "--username", help="Username to scan across platforms" - ) - parser.add_argument( - "-c", "--category", choices=load_categories().keys(), - help="Scan all platforms in a category" - ) - parser.add_argument( - "-m", "--module", help="Scan a single specific module across all categories" - ) - parser.add_argument( - "-l", "--list", action="store_true", help="List all available modules by category" - ) - parser.add_argument( - "-v", "--verbose", action="store_true", help="Enable verbose output" + description="Scan usernames or emails across multiple platforms." ) - parser.add_argument( - "-p", "--permute",type=str,help="Generate username permutations using a string pattern (e.g -p 234)" - ) - parser.add_argument( - "-s", "--stop",type=int,default=MAX_PERMUTATIONS_LIMIT,help="Limit the number of username permutations generated" - ) + group = parser.add_mutually_exclusive_group(required=False) + group.add_argument("-u", "--username", + help="Username to scan across platforms") + group.add_argument("-e", "--email", help="Email to scan across platforms") - parser.add_argument( - "-d", "--delay",type=float,default=0,help="Delay in seconds between requests (recommended: 1-2 seconds)" - ) + parser.add_argument("-c", "--category", + help="Scan all platforms in a category") - parser.add_argument( - "-f", "--format", choices=["console", "csv", "json"], default="console", help="Specify the output format (default: console)" - ) + parser.add_argument("-m", "--module", help="Scan a single specific module") + + parser.add_argument("-l", "--list", action="store_true", + help="List all available modules for username scanning") + + parser.add_argument("-v", "--verbose", action="store_true", + help="Enable verbose output") + + parser.add_argument("-p", "--permute", type=str, + help="Generate permutations using a pattern") + + parser.add_argument("-s", "--stop", type=int, + default=MAX_PERMUTATIONS_LIMIT, help="Limit permutations") + + parser.add_argument("-d", "--delay", type=float, + default=0, help="Delay between requests") parser.add_argument( - "-o", "--output", type=str, help="Specify the output file" - ) + "-f", "--format", choices=["csv", "json"], help="Output format") + + parser.add_argument("-o", "--output", type=str, help="Output file path") + parser.add_argument( - "-U", "--update", action="store_true", help="Update user-scanner to latest version" - ) + "-U", "--update", action="store_true", help="Update the tool") - args = parser.parse_args() + parser.add_argument("--version", action="store_true", help="Print version") - Printer = printer.Printer(args.format) + args = parser.parse_args() - if args.update is True: + if args.update: update_self() print(f"[{G}+{X}] {G}Update successful. Please restart the tool.{X}") sys.exit(0) + if args.version: + version, _ = load_local_version() + print(f"user-scanner current version -> {G}{version}{X}") + sys.exit(0) + if args.list: - Printer.print_modules(args.category) + is_email_list = args.email is not None + categories = load_categories(is_email_list) + for cat_name, cat_path in categories.items(): + modules = load_modules(cat_path) + print(Fore.MAGENTA + + f"\n== {cat_name.upper()} SITES =={Style.RESET_ALL}") + for module in modules: + print(f" - {get_site_name(module)}") return - check_for_updates() - - if not args.username: + if not (args.username or args.email): parser.print_help() return + check_for_updates() + print_banner() - if Printer.is_console: - print_banner() + is_email = args.email is not None + if is_email and not re.findall(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", args.email): + print(R + "[✘] Error: Invalid email format." + X) + sys.exit(1) - if args.permute and args.delay == 0 and Printer.is_console: - print( - Y - + "[!] Warning: You're generating multiple usernames with NO delay between requests. " - "This may trigger rate limits or IP bans. Use --delay 1 or higher. (Use only if the sites throw errors otherwise ignore)\n" - + Style.RESET_ALL) + target_name = args.username or args.email + targets = [target_name] - usernames = [args.username] # Default single username list - - # Added permutation support , generate all possible permutation of given sequence. if args.permute: - usernames = generate_permutations(args.username, args.permute , args.stop) - if Printer.is_console: - print( - C + f"[+] Generated {len(usernames)} username permutations" + Style.RESET_ALL) - - if args.module and "." in args.module: - args.module = args.module.replace(".", "_") - - def run_all_usernames(func, arg = None) -> List[Result]: - """ - Executes a function for all given usernames. - Made in order to simplify main() - """ - results = [] - print(Printer.get_start()) - for i, name in enumerate(usernames): - is_last = i == len(usernames) - 1 - if arg is None: - results.extend(func(name, Printer, is_last)) - else: - results.extend(func(arg, name, Printer, is_last)) - if args.delay > 0 and not is_last: - time.sleep(args.delay) - if Printer.is_json: - print(Printer.get_end()) - return results + targets = generate_permutations( + target_name, args.permute, args.stop, is_email) + print( + C + f"[+] Generated {len(targets)} permutations" + Style.RESET_ALL) - results = [] + results = [] - if args.module: - # Single module search across all categories - from user_scanner.core.orchestrator import run_module_single, find_module - modules = find_module(args.module) + for i, target in enumerate(targets): + if i != 0 and args.delay: + time.sleep(args.delay) - if len(modules) > 0: - for module in modules: - results.extend(run_all_usernames(run_module_single, module)) + if is_email: + print(f"\n{Fore.CYAN} Checking email: {target}{Style.RESET_ALL}") else: - print( - R + f"[!] Module '{args.module}' not found in any category." + Style.RESET_ALL) - - elif args.category: - # Category-wise scan - category_package = load_categories().get(args.category) - from user_scanner.core.orchestrator import run_checks_category - results = run_all_usernames(run_checks_category, category_package) - - else: - # Full scan - from user_scanner.core.orchestrator import run_checks - results = run_all_usernames(run_checks) - - if not args.output: - return - - if args.output and Printer.is_console: - msg = ( - "\n[!] The console format cannot be " - f"written to file: '{args.output}'." - ) - print(R + msg + Style.RESET_ALL) - return - - content = Printer.get_start() - - for i,result in enumerate(results): - char = "" if Printer.is_csv or is_last_value(results, i) else "," - content += "\n" + Printer.get_result_output(result) + char - - if Printer.is_json: - content += "\n" + Printer.get_end() - - with open(args.output, "a", encoding="utf-8") as f: - f.write(content) + print(f"\n{Fore.CYAN} Checking username: {target}{Style.RESET_ALL}") + + if args.module: + modules = find_module(args.module, is_email) + fn = run_email_module_batch if is_email else run_user_module + if modules: + for module in modules: + results.extend(fn(module, target)) + else: + print( + R + + f"[!] {'Email' if is_email else 'User'} module '{args.module}' not found." + + Style.RESET_ALL + ) + + elif args.category: + cat_path = load_categories(is_email).get(args.category) + fn = run_email_category_batch if is_email else run_user_category + if cat_path: + results.extend(fn(cat_path, target)) + else: + print( + R + + f"[!] {'Email' if is_email else 'User'} category '{args.module}' not found." + + Style.RESET_ALL + ) + else: + fn = run_email_full_batch if is_email else run_user_full + results.extend(fn(target)) + + if args.output: + content = formatter.into_csv( + results) if args.format == "csv" else formatter.into_json(results) + with open(args.output, "a", encoding="utf-8") as f: + f.write(content) + print(G + f"\n[+] Results saved to {args.output}" + Style.RESET_ALL) if __name__ == "__main__": diff --git a/user_scanner/cli/printer.py b/user_scanner/cli/printer.py deleted file mode 100644 index 01f035f..0000000 --- a/user_scanner/cli/printer.py +++ /dev/null @@ -1,117 +0,0 @@ -from colorama import Fore, Style -from typing import Literal -from user_scanner.core.result import Result, Status - -INDENT = " " -CSV_HEADER = "username,category,site_name,status,url,reason" - - -def indentate(msg: str, indent: int): - if indent <= 0: - return msg - tabs = INDENT * indent - return "\n".join([f"{tabs}{line}" for line in msg.split("\n")]) - - -class Printer: - def __init__(self, format: Literal["console", "csv", "json"]) -> None: - if format not in ["console", "csv", "json"]: - raise ValueError(f"Invalid output-format: {format}") - self.mode: str = format - self.indent: int = 0 - - @property - def is_console(self) -> bool: - return self.mode == "console" - - @property - def is_csv(self) -> bool: - return self.mode == "csv" - - @property - def is_json(self) -> bool: - return self.mode == "json" - - def get_start(self, json_char: str = "[") -> str: - if self.is_json: - self.indent += 1 - return indentate(json_char, self.indent - 1) - elif self.is_csv: - return CSV_HEADER - return "" - - def get_end(self, json_char: str = "]") -> str: - if not self.is_json: - return "" - self.indent = max(self.indent - 1, 0) - return indentate(json_char, self.indent) - - def get_result_output(self, result: Result) -> str: - # In principle result should always have this - site_name = result.site_name - username = result.username - - match (result.status, self.mode): - case (Status.AVAILABLE, "console"): - return f"{INDENT}{Fore.GREEN}[✔] {site_name} ({username}): Available{Style.RESET_ALL}" - - case (Status.TAKEN, "console"): - return f"{INDENT}{Fore.RED}[✘] {site_name} ({username}): Taken{Style.RESET_ALL}" - - case (Status.ERROR, "console"): - reason = "" - if isinstance(result, Result) and result.has_reason(): - reason = f" ({result.get_reason()})" - return f"{INDENT}{Fore.YELLOW}[!] {site_name} ({username}): Error{reason}{Style.RESET_ALL}" - - case (_, "json"): - return indentate(result.to_json().replace("\t", INDENT), self.indent) - - case (_, "csv"): - return result.to_csv() - - return "" - - def print_modules(self, category: str | None = None): - from user_scanner.core.orchestrator import load_categories, load_modules - categories = load_categories() - categories_to_list = [category] if category else categories.keys() - - # Print the start - if self.is_json: - print(self.get_start("{")) - elif self.is_csv: - print("category,site_name") - - for i, cat_name in enumerate(categories_to_list): - path = categories[cat_name] - modules = load_modules(path) - - # Print for each category - match self.mode: - case "console": - print(Fore.MAGENTA + - f"\n== {cat_name.upper()} SITES =={Style.RESET_ALL}") - case "json": - print(self.get_start(f"\"{cat_name}\": [")) - - for j, module in enumerate(modules): - is_last = j == len(modules) - 1 - site_name = module.__name__.split(".")[-1].capitalize() - - # Print for each site name - match self.mode: - case "console": - print(f"{INDENT}- {site_name}") - case "json": - msg = f"\"{site_name}\"" + ("" if is_last else ",") - print(indentate(msg, self.indent)) - case "csv": - print(f"{cat_name},{site_name}") - - if self.is_json: - is_last = i == len(categories_to_list) - 1 - print(self.get_end("]" if is_last else "],")) - - if self.is_json: - print(self.get_end("}")) diff --git a/user_scanner/core/email_orchestrator.py b/user_scanner/core/email_orchestrator.py new file mode 100644 index 0000000..77dceb7 --- /dev/null +++ b/user_scanner/core/email_orchestrator.py @@ -0,0 +1,78 @@ +import asyncio +from pathlib import Path +from typing import List +from types import ModuleType +from colorama import Fore, Style + +from user_scanner.core.helpers import load_categories, load_modules, find_category +from user_scanner.core.result import Result + +# Concurrency control +MAX_CONCURRENT_REQUESTS = 15 + + +async def _async_worker(module: ModuleType, email: str, sem: asyncio.Semaphore) -> Result: + async with sem: + module_name = module.__name__.split('.')[-1] + func_name = f"validate_{module_name}" + + if not hasattr(module, func_name): + return Result.error(f"Function {func_name} not found") + + func = getattr(module, func_name) + + try: + res = func(email) + result = await res if asyncio.iscoroutine(res) else res + except Exception as e: + result = Result.error(e) + + # Use helper to get actual dir name for the Result object + actual_cat = find_category(module) or "Email" + + result.update( + site_name=module_name.capitalize(), + username=email, + category=actual_cat, + is_email=True + ) + + print(result.get_console_output()) + return result + + +async def _run_batch(modules: List[ModuleType], email:str) -> List[Result]: + sem = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) + tasks = [] + for module in modules: + tasks.append(_async_worker(module, email, sem)) + + if not tasks: + return [] + return list(await asyncio.gather(*tasks)) + + +def run_email_module_batch(module: ModuleType, email: str) -> List[Result]: + return asyncio.run(_run_batch([module], email)) + + +def run_email_category_batch(category_path: Path, email: str) -> List[Result]: + cat_name = category_path.stem.capitalize() + print(f"\n{Fore.MAGENTA}== {cat_name} SITES =={Style.RESET_ALL}") + + modules = load_modules(category_path) + return asyncio.run(_run_batch(modules, email)) + + +def run_email_full_batch(email: str) -> List[Result]: + categories = load_categories(is_email=True) + all_results = [] + + for cat_name, cat_path in categories.items(): + print(f"\n{Fore.MAGENTA}== {cat_name.upper()} SITES =={Style.RESET_ALL}") + + modules = load_modules(cat_path) + cat_results = asyncio.run(_run_batch(modules, email)) + all_results.extend(cat_results) + + return all_results diff --git a/user_scanner/core/formatter.py b/user_scanner/core/formatter.py new file mode 100644 index 0000000..1018861 --- /dev/null +++ b/user_scanner/core/formatter.py @@ -0,0 +1,27 @@ +from user_scanner.core.result import Result +from typing import List + +INDENT = " " +CSV_HEADER = "username,category,site_name,status,reason" + + +def indentate(msg: str, indent: int): + if indent <= 0: + return msg + tabs = INDENT * indent + return "\n".join([f"{tabs}{line}" for line in msg.split("\n")]) + + +def into_json(results: List[Result]) -> str: + res = "[\n" + + for i, result in enumerate(results): + is_last = i == len(results) - 1 + end = "" if is_last else "," + res += indentate(result.to_json().replace("\t", INDENT), 1) + end + "\n" + + return res + "]" + + +def into_csv(results: List[Result]) -> str: + return CSV_HEADER + "\n" + "\n".join(result.to_csv() for result in results) diff --git a/user_scanner/core/helpers.py b/user_scanner/core/helpers.py index 28c65b8..5ba6959 100644 --- a/user_scanner/core/helpers.py +++ b/user_scanner/core/helpers.py @@ -1,3 +1,11 @@ +import importlib +import importlib.util +from itertools import permutations +from types import ModuleType +from pathlib import Path +from typing import Dict, List + + def get_site_name(module) -> str: name = module.__name__.split('.')[-1].capitalize().replace("_", ".") if name == "X": @@ -5,5 +13,83 @@ def get_site_name(module) -> str: return name -def is_last_value(values, i: int) -> bool: - return i == len(values) - 1 +def load_modules(category_path: Path) -> List[ModuleType]: + modules = [] + for file in category_path.glob("*.py"): + if file.name == "__init__.py": + continue + spec = importlib.util.spec_from_file_location(file.stem, str(file)) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + modules.append(module) + return modules + + +def load_categories(is_email: bool = False) -> Dict[str, Path]: + folder_name = "email_scan" if is_email else "user_scan" + root = Path(__file__).resolve().parent.parent / folder_name + categories = {} + + for subfolder in root.iterdir(): + if subfolder.is_dir() and \ + subfolder.name.lower() not in ["cli", "utils", "core"] and \ + "__" not in subfolder.name: # Removes __pycache__ + categories[subfolder.name] = subfolder.resolve() + + return categories + + +def find_module(name: str, is_email: bool = False) -> List[ModuleType]: + name = name.lower() + + return [ + module + for category_path in load_categories(is_email).values() + for module in load_modules(category_path) + if module.__name__.split(".")[-1].lower() == name + ] + + +def find_category(module: ModuleType) -> str | None: + + module_file = getattr(module, '__file__', None) + if not module_file: + return None + + category = Path(module_file).parent.name.lower() + if category in load_categories(False) or category in load_categories(True): + return category.capitalize() + + return None + + +def generate_permutations(username: str, pattern: str, limit: int | None = None, is_email: bool = False) -> List[str]: + """ + Generate all order-based permutations of characters in `pattern` + appended after `username`. + """ + + if limit and limit <= 0: + return [] + + permutations_set = {username} + chars = list(pattern) + + domain = "" + if is_email: + username, domain = username.strip().split("@") + + # generate permutations of length 1 → len(chars) + for r in range(len(chars)): + for combo in permutations(chars, r): + new = username + ''.join(combo) + if is_email: + new += "@" + domain + permutations_set.add(new) + if limit and len(permutations_set) >= limit: + return sorted(permutations_set) + + return sorted(permutations_set) diff --git a/user_scanner/core/orchestrator.py b/user_scanner/core/orchestrator.py index 04dd4c0..cec1f9f 100644 --- a/user_scanner/core/orchestrator.py +++ b/user_scanner/core/orchestrator.py @@ -1,72 +1,14 @@ -import importlib -import importlib.util from colorama import Fore, Style from concurrent.futures import ThreadPoolExecutor -from itertools import permutations import httpx from pathlib import Path -from user_scanner.cli.printer import Printer from user_scanner.core.result import Result -from typing import Callable, Dict, List -from user_scanner.core.helpers import get_site_name, is_last_value +from typing import Callable, List +from types import ModuleType +from user_scanner.core.helpers import find_category, get_site_name, load_categories, load_modules -def load_modules(category_path: Path): - modules = [] - for file in category_path.glob("*.py"): - if file.name == "__init__.py": - continue - spec = importlib.util.spec_from_file_location(file.stem, str(file)) - if spec is None or spec.loader is None: - continue - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - modules.append(module) - return modules - - -def load_categories() -> Dict[str, Path]: - root = Path(__file__).resolve().parent.parent / "user_scan" - categories = {} - - for subfolder in root.iterdir(): - if subfolder.is_dir() and \ - subfolder.name.lower() not in ["cli", "utils", "core"] and \ - "__" not in subfolder.name: # Removes __pycache__ - categories[subfolder.name] = subfolder.resolve() - - return categories - - -def find_module(name: str): - name = name.lower() - - matches = [ - module - for category_path in load_categories().values() - for module in load_modules(category_path) - if module.__name__.split(".")[-1].lower() == name - ] - - return matches - -def find_category(module) -> str | None: - - module_file = getattr(module, '__file__', None) - if not module_file: - return None - - category = Path(module_file).parent.name.lower() - categories = load_categories() - if category in categories: - return category.capitalize() - - return None - - - -def worker_single(module, username: str) -> Result: +def _worker_single(module: ModuleType, username: str) -> Result: func = next((getattr(module, f) for f in dir(module) if f.startswith("validate_") and callable(getattr(module, f))), None) @@ -81,60 +23,43 @@ def worker_single(module, username: str) -> Result: return result except Exception as e: return Result.error(e, site_name=site_name, username=username) - - -def run_module_single(module, username: str, printer: Printer, last: bool = True) -> List[Result]: - result = worker_single(module, username) + +def run_user_module(module: ModuleType, username: str) -> List[Result]: + result = _worker_single(module, username) category = find_category(module) if category: result.update(category=category) - get_site_name(module) - msg = printer.get_result_output(result) - if not last and printer.is_json: - msg += "," - print(msg) + print(result.get_console_output()) return [result] - -def run_checks_category(category_path: Path, username: str, printer: Printer, last: bool = True) -> List[Result]: - modules = load_modules(category_path) - +def run_user_category(category_path: Path, username: str) -> List[Result]: category_name = category_path.stem.capitalize() - if printer.is_console: - print(f"\n{Fore.MAGENTA}== {category_name} SITES =={Style.RESET_ALL}") + print(f"\n{Fore.MAGENTA}== {category_name} SITES =={Style.RESET_ALL}") results = [] + modules = load_modules(category_path) with ThreadPoolExecutor(max_workers=10) as executor: - exec_map = executor.map(lambda m: worker_single(m, username), modules) - for i, result in enumerate(exec_map): - result.update(category = category_name) + exec_map = executor.map(lambda m: _worker_single(m, username), modules) + for result in exec_map: + result.update(category=category_name) results.append(result) - is_last = last and is_last_value(modules, i) - get_site_name(modules[i]) - msg = printer.get_result_output(result) - if not is_last and printer.is_json: - msg += "," - print(msg) + print(result.get_console_output()) return results -def run_checks(username: str, printer: Printer, last: bool = True) -> List[Result]: - if printer.is_console: - print(f"\n{Fore.CYAN} Checking username: {username}{Style.RESET_ALL}") - +def run_user_full(username: str) -> List[Result]: results = [] categories = list(load_categories().values()) - for i, category_path in enumerate(categories): - last_cat = last and (i == len(categories) - 1) - temp = run_checks_category(category_path, username, printer, last_cat) + for category_path in categories: + temp = run_user_category(category_path, username) results.extend(temp) return results @@ -194,22 +119,3 @@ def contains(a, b): return (isinstance(a, list) and b in a) or (a == b) return Result.error("Status didn't match. Report this on Github.") return generic_validate(url, inner, **kwargs) - - -def generate_permutations(username, pattern, limit=None): - """ - Generate all order-based permutations of characters in `pattern` - appended after `username`. - """ - permutations_set = {username} - - chars = list(pattern) - - # generate permutations of length 1 → len(chars) - for r in range(1, len(chars) + 1): - for combo in permutations(chars, r): - permutations_set.add(username + ''.join(combo)) - if limit and len(permutations_set) >= limit: - return list(permutations_set)[:limit] - - return sorted(permutations_set) diff --git a/user_scanner/core/result.py b/user_scanner/core/result.py index 865657f..bf4a8c3 100644 --- a/user_scanner/core/result.py +++ b/user_scanner/core/result.py @@ -1,4 +1,5 @@ from enum import Enum +from colorama import Fore, Style DEBUG_MSG = """Result {{ status: {status}, @@ -6,6 +7,7 @@ username: "{username}", site_name: "{site_name}", category: "{category}", + is_email: "{is_email}" }}""" JSON_TEMPLATE = """{{ @@ -35,8 +37,16 @@ class Status(Enum): AVAILABLE = 1 ERROR = 2 + def to_label(self, is_email=False): + """Returns the appropriate text label based on the scan type""" + if self == Status.ERROR: + return "Error" + if is_email: + return "Registered" if self == Status.TAKEN else "Not Registered" + return "Taken" if self == Status.TAKEN else "Available" + def __str__(self): - return super().__str__().split(".")[1].capitalize() + return self.to_label(is_email=False) class Result: @@ -47,10 +57,11 @@ def __init__(self, status: Status, reason: str | Exception | None = None, **kwar self.username = None self.site_name = None self.category = None + self.is_email = False # Track if this is an email result self.update(**kwargs) def update(self, **kwargs): - for field in ("username", "site_name", "category"): + for field in ("username", "site_name", "category", "is_email"): if field in kwargs and kwargs[field] is not None: setattr(self, field, kwargs[field]) @@ -92,18 +103,22 @@ def get_reason(self) -> str: def as_dict(self) -> dict: return { - "status": self.status, + "status": self.status.to_label(self.is_email), # Use dynamic labels "reason": self.get_reason(), "username": self.username, "site_name": self.site_name, - "category": self.category + "category": self.category, + "is_email": self.is_email } def debug(self) -> str: return DEBUG_MSG.format(**self.as_dict()) def to_json(self) -> str: - return JSON_TEMPLATE.format(**self.as_dict()) + msg = JSON_TEMPLATE.format(**self.as_dict()) + if self.is_email: + msg = msg.replace("\t\"username\":", "\t\"email\":") + return msg def to_csv(self) -> str: return CSV_TEMPLATE.format(**self.as_dict()) @@ -123,3 +138,19 @@ def __eq__(self, other): return NotImplemented + def get_console_output(self) -> str: + site_name = self.site_name + username = self.username + status_text = self.status.to_label(self.is_email) + + if self == Status.AVAILABLE: + return f" {Fore.GREEN}[✔] {site_name} ({username}): {status_text}{Style.RESET_ALL}" + elif self == Status.TAKEN: + return f" {Fore.RED}[✘] {site_name} ({username}): {status_text}{Style.RESET_ALL}" + elif self == Status.ERROR: + reason = "" + if isinstance(self, Result) and self.has_reason(): + reason = f" ({self.get_reason()})" + return f" {Fore.YELLOW}[!] {site_name} ({username}): {status_text}{reason}{Style.RESET_ALL}" + + return "" diff --git a/user_scanner/email_scan/__init__.py b/user_scanner/email_scan/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/user_scanner/email_scan/porn/pornhub.py b/user_scanner/email_scan/porn/pornhub.py new file mode 100644 index 0000000..33c8b14 --- /dev/null +++ b/user_scanner/email_scan/porn/pornhub.py @@ -0,0 +1,62 @@ +import httpx +import re +from user_scanner.core.result import Result + +async def _check(email: str) -> Result: + base_url = "https://www.pornhub.com" + check_api = f"{base_url}/api/v1/user/create_account_check" + + headers = { + "user-agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Mobile Safari/537.36", + "x-requested-with": "XMLHttpRequest", + "origin": base_url, + "referer": base_url + "/", + "content-type": "application/x-www-form-urlencoded; charset=UTF-8" + } + + async with httpx.AsyncClient(http2=True, follow_redirects=True, timeout=3) as client: + try: + landing_resp = await client.get(base_url, headers=headers) + token_match = re.search(r'var\s+token\s*=\s*"([^"]+)"', landing_resp.text) + + if not token_match: + return Result.error("Failed to extract dynamic token from HTML") + + token = token_match.group(1) + + params = {"token": token} + payload = { + "check_what": "email", + "email": email + } + + response = await client.post( + check_api, + params=params, + headers=headers, + data=payload + ) + + if response.status_code == 429: + return Result.error("Rate limited, wait for a few minutes") + + if response.status_code != 200: + return Result.error(f"HTTP Error: {response.status_code}") + + data = response.json() + status = data.get("email") + error_msg = data.get("error_message", "") + + if status == "create_account_passed": + return Result.available() + elif "already in use" in error_msg.lower() or status != "create_account_passed": + return Result.taken() + else: + return Result.error(f"Unexpected API response: {status}") + + except Exception as e: + return Result.error(e) + + +async def validate_pornhub(email: str) -> Result: + return await _check(email) diff --git a/user_scanner/email_scan/social/__init__.py b/user_scanner/email_scan/social/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/user_scanner/email_scan/social/instagram.py b/user_scanner/email_scan/social/instagram.py new file mode 100644 index 0000000..749b05b --- /dev/null +++ b/user_scanner/email_scan/social/instagram.py @@ -0,0 +1,48 @@ +import httpx +from user_scanner.core.result import Result + + +async def _check(email): + USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36" + async with httpx.AsyncClient(headers={"user-agent": USER_AGENT}, http2=True) as client: + await client.get("https://www.instagram.com/") + csrf = client.cookies.get("csrftoken") + + headers = { + "x-csrftoken": csrf, + 'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + 'Accept-Encoding': "identity", + 'sec-ch-ua-full-version-list': "\"Google Chrome\";v=\"143.0.7499.146\", \"Chromium\";v=\"143.0.7499.146\", \"Not A(Brand\";v=\"24.0.0.0\"", + 'sec-ch-ua-platform': "\"Linux\"", + 'sec-ch-ua': "\"Google Chrome\";v=\"143\", \"Chromium\";v=\"143\", \"Not A(Brand\";v=\"24\"", + 'sec-ch-ua-model': "\"\"", + 'sec-ch-ua-mobile': "?0", + 'x-ig-app-id': "936619743392459", + 'x-requested-with': "XMLHttpRequest", + 'x-instagram-ajax': "1031566424", + 'x-asbd-id': "359341", + 'x-ig-www-claim': "0", + 'sec-ch-ua-platform-version': "\"\"", + 'origin': "https://www.instagram.com", + 'referer': "https://www.instagram.com/accounts/password/reset/" + } + + response = await client.post( + "https://www.instagram.com/api/v1/web/accounts/account_recovery_send_ajax/", + data={"email_or_username": email}, + headers=headers + ) + + data = response.json() + status_val = data.get("status") + if status_val == "ok": + return Result.taken() + elif status_val == "fail": + return Result.available() + else: + return Result.error("Unexpected response body, report it on github") + + + +async def validate_instagram(email: str) -> Result: + return await _check(email) diff --git a/user_scanner/email_scan/social/mastodon.py b/user_scanner/email_scan/social/mastodon.py new file mode 100644 index 0000000..6084339 --- /dev/null +++ b/user_scanner/email_scan/social/mastodon.py @@ -0,0 +1,53 @@ +import httpx +import re +from user_scanner.core.result import Result + + +async def _check(email): + base_url = "https://mastodon.social" + signup_url = f"{base_url}/auth/sign_up" + post_url = f"{base_url}/auth" + + headers = { + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", + "referer": "https://mastodon.social/explore", + "origin": "https://mastodon.social" + } + + async with httpx.AsyncClient(http2=True, headers=headers, follow_redirects=True) as client: + try: + initial_resp = await client.get(signup_url) + if initial_resp.status_code != 200: + return Result.error(f"Failed to access signup page: {initial_resp.status_code}") + + token_match = re.search( + r'name="csrf-token" content="([^"]+)"', initial_resp.text) + if not token_match: + return Result.error("Could not find authenticity token") + + csrf_token = token_match.group(1) + + payload = { + "authenticity_token": csrf_token, + "user[account_attributes][username]": "no3motions_robot_020102", + "user[email]": email, + "user[password]": "Theleftalone@me", + "user[password_confirmation]": "Theleftalone@me", + "user[agreement]": "1", + "button": "" + } + + response = await client.post(post_url, data=payload) + + if "has already been taken" in response.text: + return Result.taken() + else: + return Result.available() + + except Exception as e: + return Result.error(e) + + +async def validate_mastodon(email: str) -> Result: + return await _check(email) diff --git a/user_scanner/email_scan/social/x.py b/user_scanner/email_scan/social/x.py new file mode 100644 index 0000000..8a6dd94 --- /dev/null +++ b/user_scanner/email_scan/social/x.py @@ -0,0 +1,39 @@ +import httpx +from user_scanner.core.result import Result + +async def _check(email): + url = "https://api.x.com/i/users/email_available.json" + params = {"email": email} + headers = { + "user-agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Mobile Safari/537.36", + "accept-encoding": "gzip, deflate, br, zstd", + "sec-ch-ua-platform": "\"Android\"", + "sec-ch-ua": "\"Google Chrome\";v=\"143\", \"Chromium\";v=\"143\", \"Not A(Brand\";v=\"24\"", + "x-twitter-client-language": "en", + "sec-ch-ua-mobile": "?1", + "x-twitter-active-user": "yes", + "origin": "https://x.com", + "priority": "u=1, i" + } + + async with httpx.AsyncClient(http2=True) as client: + try: + response = await client.get(url, params=params, headers=headers) + + if response.status_code == 429: + return Result.error("Rate limited wait for few minutes") + + data = response.json() + taken_bool = data.get("taken") + + if taken_bool is True: + return Result.taken() + else: + return Result.available() + + except Exception as e: + return Result.error(e) + + +async def validate_x(email: str) -> Result: + return await _check(email)