diff --git a/backend/app/routes/user_routes.py b/backend/app/routes/user_routes.py index dc864ed..cf94a87 100644 --- a/backend/app/routes/user_routes.py +++ b/backend/app/routes/user_routes.py @@ -7,12 +7,29 @@ import re import time -from flask import Blueprint, request, jsonify, make_response, session, redirect, current_app -from flask_jwt_extended import create_access_token, set_access_cookies, unset_jwt_cookies +from flask import ( + Blueprint, + request, + jsonify, + make_response, + session, + redirect, + current_app, +) +from flask_jwt_extended import ( + create_access_token, + set_access_cookies, + unset_jwt_cookies, +) import mwoauth from app.database import db -from app.middleware.auth import require_auth, require_role, handle_errors, validate_json_data +from app.middleware.auth import ( + require_auth, + require_role, + handle_errors, + validate_json_data, +) from app.models.user import User # ------------------------------------------------------------------------ @@ -25,13 +42,14 @@ _oauth_token_cache = {} # Create blueprint -user_bp = Blueprint('user', __name__) +user_bp = Blueprint("user", __name__) # ------------------------------------------------------------------------ # VALIDATION UTILITIES # ------------------------------------------------------------------------ + def validate_email(email): """ Validate email format @@ -42,9 +60,10 @@ def validate_email(email): Returns: bool: True if valid email, False otherwise """ - pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return re.match(pattern, email) is not None + def validate_username(username): """ Validate username format @@ -56,17 +75,68 @@ def validate_username(username): bool: True if valid username, False otherwise """ # Username should be 3-20 characters, alphanumeric and underscores only - pattern = r'^[a-zA-Z0-9_]{3,20}$' + pattern = r"^[a-zA-Z0-9_]{3,20}$" return re.match(pattern, username) is not None +def sort_contests_by_status(contests): + """ + Sort contests by status priority and creation date + + Sorting order: + 1. Status priority: current/active (1) → upcoming (2) → past/completed (3) + 2. Within same status: newest first (by created_at) + + Args: + contests: List of contest dictionaries + + Returns: + Sorted list of contests + """ + # Define status priority order + STATUS_ORDER = {"current": 1, "active": 1, "upcoming": 2, "past": 3, "completed": 3} + + def get_sort_key(contest): + """Generate sort key for a contest""" + # Get status priority (default to 4 for unknown status) + status = contest.get("status", "").lower() + status_priority = STATUS_ORDER.get(status, 4) + + # Get created_at timestamp for secondary sort + created_at = contest.get("created_at") + if created_at: + try: + # Handle both string and datetime objects + if isinstance(created_at, str): + from datetime import datetime + + date_obj = datetime.fromisoformat(created_at.replace("Z", "+00:00")) + timestamp = date_obj.timestamp() + elif hasattr(created_at, "timestamp"): + timestamp = created_at.timestamp() + else: + timestamp = 0 + except: + timestamp = 0 + else: + timestamp = 0 + + # Return tuple for sorting + # Negative timestamp for descending order (newest first) + return (status_priority, -timestamp) + + # Sort and return + return sorted(contests, key=get_sort_key) + + # ------------------------------------------------------------------------ # USER REGISTRATION & AUTHENTICATION # ------------------------------------------------------------------------ -@user_bp.route('/register', methods=['POST']) + +@user_bp.route("/register", methods=["POST"]) @handle_errors -@validate_json_data(['username', 'email', 'password']) +@validate_json_data(["username", "email", "password"]) def register(): """ Register a new user @@ -81,20 +151,27 @@ def register(): JSON response with success message and user ID """ data = request.validated_data - username = data['username'].strip() - email = data['email'].strip().lower() - password = data['password'] - role = data.get('role', 'user') + username = data["username"].strip() + email = data["email"].strip().lower() + password = data["password"] + role = data.get("role", "user") # --- Input Validation --- if not validate_username(username): - return jsonify({'error': 'Username must be 3-20 characters, alphanumeric and underscores only'}), 400 + return ( + jsonify( + { + "error": "Username must be 3-20 characters, alphanumeric and underscores only" + } + ), + 400, + ) if not validate_email(email): - return jsonify({'error': 'Invalid email format'}), 400 + return jsonify({"error": "Invalid email format"}), 400 if len(password) < 6: - return jsonify({'error': 'Password must be at least 6 characters long'}), 400 + return jsonify({"error": "Password must be at least 6 characters long"}), 400 # --- Role Validation --- # Allow only two roles via the public API for security: @@ -105,35 +182,40 @@ def register(): # - "superadmin" role MUST NOT be created through this endpoint. # - Superadmin accounts should ONLY be created/updated directly in the database # or via a very secure internal tool, to avoid privilege escalation. - if role not in ['user', 'admin']: - return jsonify({'error': 'Invalid role'}), 400 + if role not in ["user", "admin"]: + return jsonify({"error": "Invalid role"}), 400 # --- Uniqueness Checks --- # Check if username already exists if User.query.filter_by(username=username).first(): - return jsonify({'error': 'Username already exists'}), 400 + return jsonify({"error": "Username already exists"}), 400 # Check if email already exists if User.query.filter_by(email=email).first(): - return jsonify({'error': 'Email already exists'}), 400 + return jsonify({"error": "Email already exists"}), 400 # --- Create User --- try: user = User(username=username, email=email, password=password, role=role) user.save() - return jsonify({ - 'message': 'User created successfully', - 'userId': user.id, - 'username': user.username - }), 201 + return ( + jsonify( + { + "message": "User created successfully", + "userId": user.id, + "username": user.username, + } + ), + 201, + ) except Exception: # pylint: disable=broad-exception-caught # Log error for debugging but don't expose details to client - return jsonify({'error': 'Failed to create user'}), 500 + return jsonify({"error": "Failed to create user"}), 500 -@user_bp.route('/login', methods=['POST']) +@user_bp.route("/login", methods=["POST"]) @handle_errors def login(): """ @@ -149,20 +231,20 @@ def login(): # Get JSON data directly (not using validator for login to allow flexible error handling) data = request.get_json() if not data: - return jsonify({'error': 'No JSON data provided'}), 400 + return jsonify({"error": "No JSON data provided"}), 400 - email = data.get('email', '').strip().lower() - password = data.get('password', '') + email = data.get("email", "").strip().lower() + password = data.get("password", "") if not email or not password: - return jsonify({'error': 'Email and password required'}), 400 + return jsonify({"error": "Email and password required"}), 400 # --- Authenticate User --- # Find user by email user = User.query.filter_by(email=email).first() if not user or not user.check_password(password): - return jsonify({'error': 'Invalid email or password'}), 401 + return jsonify({"error": "Invalid email or password"}), 401 try: # Create JWT token for authenticated session @@ -171,12 +253,16 @@ def login(): # Create response with user info # NOTE: we also include the user's role so frontend can know if they are # admin / superadmin and adjust UI (like delete buttons) accordingly. - response = make_response(jsonify({ - 'message': 'Login successful', - 'userId': user.id, - 'username': user.username, - 'role': user.role - })) + response = make_response( + jsonify( + { + "message": "Login successful", + "userId": user.id, + "username": user.username, + "role": user.role, + } + ) + ) # Set JWT token in HTTP-only cookie for security set_access_cookies(response, access_token) @@ -186,10 +272,10 @@ def login(): except Exception as error: # pylint: disable=broad-exception-caught # Log error for debugging current_app.logger.error(f"Error in login process: {str(error)}") - return jsonify({'error': 'Login failed'}), 500 + return jsonify({"error": "Login failed"}), 500 -@user_bp.route('/logout', methods=['POST']) +@user_bp.route("/logout", methods=["POST"]) @handle_errors def logout(): """ @@ -204,11 +290,11 @@ def logout(): JSON response with success message """ # Clear OAuth session data if present (for OAuth users) - session.pop('request_token', None) - session.pop('request_secret', None) + session.pop("request_token", None) + session.pop("request_secret", None) # Clear JWT token cookie (works for both regular and OAuth users) - response = make_response(jsonify({'message': 'Logout successful'})) + response = make_response(jsonify({"message": "Logout successful"})) # Unset JWT cookies using Flask-JWT-Extended helper unset_jwt_cookies(response) @@ -217,24 +303,24 @@ def logout(): # This is important for localhost:5000 -> localhost:5173 scenarios # where cookies need to work across different development server ports response.set_cookie( - 'access_token_cookie', - value='', + "access_token_cookie", + value="", expires=0, httponly=True, - samesite='Lax', + samesite="Lax", secure=False, domain=None, # None allows cookie to work across localhost ports - path='/' + path="/", ) response.set_cookie( - 'csrf_access_token', - value='', + "csrf_access_token", + value="", expires=0, httponly=True, - samesite='Lax', + samesite="Lax", secure=False, domain=None, - path='/' + path="/", ) return response, 200 @@ -244,13 +330,18 @@ def logout(): # USER DASHBOARD & PROFILE # ------------------------------------------------------------------------ + @user_bp.route('/dashboard', methods=['GET']) @require_auth @handle_errors def get_dashboard(): """ - Get user dashboard data - + Get user dashboard data with SORTED contests + + Contests are sorted by: + 1. Status priority: current/active → upcoming → past/completed + 2. Created date (newest first) within same status + Returns: JSON response with user's dashboard information """ @@ -293,15 +384,18 @@ def get_dashboard(): } submissions_by_contest[contest_id]['submissions'].append(submission.to_dict()) - # --- Get Contests Created by User --- + # --- Get Contests Created by User --- created_contests = Contest.query.filter_by(created_by=user.username).all() created_contests_data = [] for contest in created_contests: contest_data = contest.to_dict() contest_data['submission_count'] = contest.get_submission_count() created_contests_data.append(contest_data) + + # SORT CREATED CONTESTS + created_contests_data = sort_contests_by_status(created_contests_data) - # --- Get Contests Where User is a Jury Member --- + # --- Get Contests Where User is a Jury Member (WITH SORTING) --- jury_contests = Contest.query.filter( Contest.jury_members.like(f'%{user.username}%') ).all() @@ -310,6 +404,9 @@ def get_dashboard(): contest_data = contest.to_dict() contest_data['submission_count'] = contest.get_submission_count() jury_contests_data.append(contest_data) + + # SORT JURY CONTESTS + jury_contests_data = sort_contests_by_status(jury_contests_data) return jsonify({ 'username': user.username, @@ -328,9 +425,8 @@ def get_dashboard(): 'jury_contests': jury_contests_data }), 200 - -@user_bp.route('/all', methods=['GET']) -@require_role('admin') +@user_bp.route("/all", methods=["GET"]) +@require_role("admin") @handle_errors def get_all_users(): """ @@ -343,7 +439,7 @@ def get_all_users(): return jsonify([user.to_dict() for user in users]), 200 -@user_bp.route('/profile', methods=['GET']) +@user_bp.route("/profile", methods=["GET"]) @require_auth @handle_errors def get_profile(): @@ -357,10 +453,10 @@ def get_profile(): return jsonify(user.to_dict()), 200 -@user_bp.route('/profile', methods=['PUT']) +@user_bp.route("/profile", methods=["PUT"]) @require_auth @handle_errors -@validate_json_data(['username', 'email']) +@validate_json_data(["username", "email"]) def update_profile(): """ Update current user's profile @@ -375,46 +471,52 @@ def update_profile(): user = request.current_user data = request.validated_data - new_username = data['username'].strip() - new_email = data['email'].strip().lower() + new_username = data["username"].strip() + new_email = data["email"].strip().lower() # --- Input Validation --- if not validate_username(new_username): - return jsonify({'error': 'Username must be 3-20 characters, alphanumeric and underscores only'}), 400 + return ( + jsonify( + { + "error": "Username must be 3-20 characters, alphanumeric and underscores only" + } + ), + 400, + ) if not validate_email(new_email): - return jsonify({'error': 'Invalid email format'}), 400 + return jsonify({"error": "Invalid email format"}), 400 # --- Uniqueness Checks --- # Check if username is already taken by another user existing_user = User.query.filter( - User.username == new_username, - User.id != user.id + User.username == new_username, User.id != user.id ).first() if existing_user: - return jsonify({'error': 'Username already exists'}), 400 + return jsonify({"error": "Username already exists"}), 400 # Check if email is already taken by another user existing_email = User.query.filter( - User.email == new_email, - User.id != user.id + User.email == new_email, User.id != user.id ).first() if existing_email: - return jsonify({'error': 'Email already exists'}), 400 + return jsonify({"error": "Email already exists"}), 400 # --- Update User Data --- user.username = new_username user.email = new_email user.save() - return jsonify({'message': 'Profile updated successfully'}), 200 + return jsonify({"message": "Profile updated successfully"}), 200 # ------------------------------------------------------------------------ # OAUTH ROUTES (Wikimedia OAuth 1.0a) # ------------------------------------------------------------------------ -@user_bp.route('/oauth/login', methods=['GET']) + +@user_bp.route("/oauth/login", methods=["GET"]) @handle_errors def oauth_login(): """ @@ -429,19 +531,28 @@ def oauth_login(): # --- Get OAuth Configuration --- # Get OAuth 1.0a configuration from app config (loaded from .env file) # These values come from the .env file: CONSUMER_KEY, CONSUMER_SECRET, OAUTH_MWURI - consumer_key = current_app.config.get('CONSUMER_KEY') - consumer_secret = current_app.config.get('CONSUMER_SECRET') - mw_uri = current_app.config.get('OAUTH_MWURI', 'https://meta.wikimedia.org/w/index.php') + consumer_key = current_app.config.get("CONSUMER_KEY") + consumer_secret = current_app.config.get("CONSUMER_SECRET") + mw_uri = current_app.config.get( + "OAUTH_MWURI", "https://meta.wikimedia.org/w/index.php" + ) # Check if OAuth is configured if not consumer_key or not consumer_secret: - return jsonify({ - 'error': 'OAuth not configured. Please set CONSUMER_KEY and CONSUMER_SECRET in .env file' - }), 500 + return ( + jsonify( + { + "error": "OAuth not configured. Please set CONSUMER_KEY and CONSUMER_SECRET in .env file" + } + ), + 500, + ) # Log OAuth configuration for debugging - current_app.logger.info(f'OAuth login initiated - Consumer Key: {consumer_key[:10]}...') - current_app.logger.info(f'OAuth MW URI: {mw_uri}') + current_app.logger.info( + f"OAuth login initiated - Consumer Key: {consumer_key[:10]}..." + ) + current_app.logger.info(f"OAuth MW URI: {mw_uri}") try: # --- Build Callback URL --- @@ -453,19 +564,19 @@ def oauth_login(): # For local development, ensure we use 'localhost' not '127.0.0.1' # OAuth consumer is registered with 'localhost:5000', so we must use that exact format - if '127.0.0.1' in host or (host.startswith('localhost') and ':' not in host): + if "127.0.0.1" in host or (host.startswith("localhost") and ":" not in host): # Extract port if present - port = ':5000' # Default port - if ':' in host: - port = ':' + host.split(':')[1] + port = ":5000" # Default port + if ":" in host: + port = ":" + host.split(":")[1] # Force localhost for local development to match OAuth consumer registration - host = f'localhost{port}' + host = f"localhost{port}" # Check if we should use a custom callback path (e.g., for Toolforge) # Toolforge OAuth consumer is registered with /oauth/callback # Regular deployment uses /api/user/oauth/callback # For local development: http://localhost:5000/api/user/oauth/callback - custom_callback_path = current_app.config.get('OAUTH_CALLBACK_PATH', None) + custom_callback_path = current_app.config.get("OAUTH_CALLBACK_PATH", None) if custom_callback_path: # Use custom callback path (e.g., /oauth/callback for Toolforge) callback_url = f"{scheme}://{host}{custom_callback_path}" @@ -475,9 +586,9 @@ def oauth_login(): callback_url = f"{scheme}://{host}/api/user/oauth/callback" # Log the exact callback URL being used for debugging - current_app.logger.info(f'Built callback URL: {callback_url}') + current_app.logger.info(f"Built callback URL: {callback_url}") current_app.logger.info( - f'Request host: {request.host}, Scheme: {scheme}, Final host: {host}' + f"Request host: {request.host}, Scheme: {scheme}, Final host: {host}" ) # --- Determine Callback Parameter --- @@ -485,25 +596,25 @@ def oauth_login(): # If your OAuth consumer was registered with "oob", you must use "oob" here # Otherwise, use the callback URL that matches your registration # Most web applications should register with a callback URL, not "oob" - use_oob = current_app.config.get('OAUTH_USE_OOB', False) + use_oob = current_app.config.get("OAUTH_USE_OOB", False) if use_oob: # Use "oob" for out-of-band (manual verification code entry) # This is required if OAuth consumer was registered with "oob" callback_param = "oob" - current_app.logger.info('Using OAuth callback: oob (out-of-band)') + current_app.logger.info("Using OAuth callback: oob (out-of-band)") current_app.logger.warning( 'OAuth consumer registered with "oob" - ' - 'user will need to manually enter verification code' + "user will need to manually enter verification code" ) else: # Use the callback URL for automatic redirect # This must match exactly what was registered in OAuth consumer callback_param = callback_url - current_app.logger.info(f'Using OAuth callback URL: {callback_url}') + current_app.logger.info(f"Using OAuth callback URL: {callback_url}") current_app.logger.info( - f'IMPORTANT: Make sure your OAuth consumer is registered with ' - f'this exact callback URL: {callback_url}' + f"IMPORTANT: Make sure your OAuth consumer is registered with " + f"this exact callback URL: {callback_url}" ) # --- Initiate OAuth Flow --- @@ -514,26 +625,28 @@ def oauth_login(): # The callback parameter is required and must match OAuth consumer registration exactly # If registered with "oob", use "oob". If registered with URL, use that exact URL redirect_url, request_token = mwoauth.initiate( - mw_uri, - consumer_token, - callback=callback_param + mw_uri, consumer_token, callback=callback_param ) # --- Store Request Token --- # Store request token in session for later verification - session['request_token'] = request_token.key - session['request_secret'] = request_token.secret + session["request_token"] = request_token.key + session["request_secret"] = request_token.secret # Also store in temporary cache as backup (in case session cookies don't persist) # This helps when redirecting to external sites where cookies might not work _oauth_token_cache[request_token.key] = { - 'secret': request_token.secret, - 'timestamp': time.time() + "secret": request_token.secret, + "timestamp": time.time(), } # Clean up old cache entries (older than 10 minutes) current_time = time.time() - expired_keys = [k for k, v in _oauth_token_cache.items() if current_time - v['timestamp'] > 600] + expired_keys = [ + k + for k, v in _oauth_token_cache.items() + if current_time - v["timestamp"] > 600 + ] for key in expired_keys: _oauth_token_cache.pop(key, None) @@ -543,9 +656,11 @@ def oauth_login(): session.modified = True # Mark session as modified to ensure it's saved # Log session storage for debugging - current_app.logger.info(f'Session stored - request_token: {request_token.key[:10]}...') - current_app.logger.info(f'Session keys: {list(session.keys())}') - current_app.logger.info('Token also cached as backup') + current_app.logger.info( + f"Session stored - request_token: {request_token.key[:10]}..." + ) + current_app.logger.info(f"Session keys: {list(session.keys())}") + current_app.logger.info("Token also cached as backup") # Create response with redirect to ensure session cookie is set response = make_response(redirect(redirect_url)) @@ -555,14 +670,14 @@ def oauth_login(): except Exception as error: # pylint: disable=broad-exception-caught # OAuth can fail in many ways, so we catch all exceptions - current_app.logger.error(f'OAuth initiation error: {str(error)}') - return jsonify({ - 'error': 'Failed to initiate OAuth login', - 'details': str(error) - }), 500 + current_app.logger.error(f"OAuth initiation error: {str(error)}") + return ( + jsonify({"error": "Failed to initiate OAuth login", "details": str(error)}), + 500, + ) -@user_bp.route('/oauth/callback', methods=['GET']) +@user_bp.route("/oauth/callback", methods=["GET"]) @handle_errors def oauth_callback(): """ @@ -581,17 +696,19 @@ def oauth_callback(): # --- Get OAuth Configuration --- # Get OAuth 1.0a configuration from app config (loaded from .env file) # These values come from the .env file: CONSUMER_KEY, CONSUMER_SECRET, OAUTH_MWURI - consumer_key = current_app.config.get('CONSUMER_KEY') - consumer_secret = current_app.config.get('CONSUMER_SECRET') - mw_uri = current_app.config.get('OAUTH_MWURI', 'https://meta.wikimedia.org/w/index.php') + consumer_key = current_app.config.get("CONSUMER_KEY") + consumer_secret = current_app.config.get("CONSUMER_SECRET") + mw_uri = current_app.config.get( + "OAUTH_MWURI", "https://meta.wikimedia.org/w/index.php" + ) # --- Get OAuth Parameters from Callback --- - oauth_verifier = request.args.get('oauth_verifier') - oauth_token = request.args.get('oauth_token') + oauth_verifier = request.args.get("oauth_verifier") + oauth_token = request.args.get("oauth_token") # Get stored request token from session (primary method) - request_token_key = session.get('request_token') - request_secret = session.get('request_secret') + request_token_key = session.get("request_token") + request_secret = session.get("request_secret") # --- Fallback to Cache if Session Failed --- # If session doesn't have the token, try to get it from cache (fallback) @@ -601,41 +718,48 @@ def oauth_callback(): cached_data = _oauth_token_cache.get(oauth_token) if cached_data: request_token_key = oauth_token - request_secret = cached_data['secret'] - current_app.logger.info('Retrieved OAuth token from cache (session cookie failed)') + request_secret = cached_data["secret"] + current_app.logger.info( + "Retrieved OAuth token from cache (session cookie failed)" + ) # Clean up cache entry after use _oauth_token_cache.pop(oauth_token, None) # Log session data for debugging current_app.logger.info( - f'OAuth callback received - oauth_token: {oauth_token}, ' - f'oauth_verifier: {oauth_verifier}' + f"OAuth callback received - oauth_token: {oauth_token}, " + f"oauth_verifier: {oauth_verifier}" ) current_app.logger.info( - f'Session data - request_token_key: {request_token_key}, ' - f'request_secret: {bool(request_secret)}' + f"Session data - request_token_key: {request_token_key}, " + f"request_secret: {bool(request_secret)}" ) - current_app.logger.info(f'Session keys: {list(session.keys())}') + current_app.logger.info(f"Session keys: {list(session.keys())}") # --- Validate Callback Parameters --- if not oauth_verifier or not oauth_token: - return jsonify({'error': 'Missing OAuth parameters'}), 400 + return jsonify({"error": "Missing OAuth parameters"}), 400 if not request_token_key or not request_secret: # Provide more detailed error message for debugging - current_app.logger.error('OAuth session expired - session data missing') - current_app.logger.error(f'Available session keys: {list(session.keys())}') - current_app.logger.error(f'Cache keys: {list(_oauth_token_cache.keys())}') - return jsonify({ - 'error': 'OAuth session expired. Please try again.', - 'details': ( - 'Session data was not found. Make sure cookies are enabled ' - 'and you\'re using the same browser session.' - ) - }), 400 + current_app.logger.error("OAuth session expired - session data missing") + current_app.logger.error(f"Available session keys: {list(session.keys())}") + current_app.logger.error(f"Cache keys: {list(_oauth_token_cache.keys())}") + return ( + jsonify( + { + "error": "OAuth session expired. Please try again.", + "details": ( + "Session data was not found. Make sure cookies are enabled " + "and you're using the same browser session." + ), + } + ), + 400, + ) if oauth_token != request_token_key: - return jsonify({'error': 'Invalid OAuth token'}), 400 + return jsonify({"error": "Invalid OAuth token"}), 400 try: # --- Exchange Request Token for Access Token --- @@ -649,24 +773,31 @@ def oauth_callback(): response_qs = request.query_string # Log parameters before calling complete - current_app.logger.info(f'Calling mwoauth.complete with query string: {response_qs.decode("utf-8")}') - current_app.logger.info(f'oauth_verifier: {oauth_verifier}, oauth_token: {oauth_token}') + current_app.logger.info( + f'Calling mwoauth.complete with query string: {response_qs.decode("utf-8")}' + ) + current_app.logger.info( + f"oauth_verifier: {oauth_verifier}, oauth_token: {oauth_token}" + ) access_token = mwoauth.complete( mw_uri, consumer_token, request_token, - response_qs # Pass the full query string as bytes (not decoded) + response_qs, # Pass the full query string as bytes (not decoded) ) # --- Get User Identity from Wikimedia --- identity = mwoauth.identify(mw_uri, consumer_token, access_token) # Extract user information - username = identity.get('username', '') + username = identity.get("username", "") if not username: - return jsonify({'error': 'Failed to get user information from Wikimedia'}), 500 + return ( + jsonify({"error": "Failed to get user information from Wikimedia"}), + 500, + ) # --- Find or Create User in Database --- # Use username as the unique identifier @@ -677,13 +808,14 @@ def oauth_callback(): # OAuth users don't need a password, but User model requires one # Generate a random secure password that will never be used import secrets + random_password = secrets.token_urlsafe(32) # User.__init__ will automatically hash the password via set_password user = User( username=username, - email=f'{username}@wikimedia.oauth', # Placeholder email + email=f"{username}@wikimedia.oauth", # Placeholder email password=random_password, # Random password (OAuth users won't use it) - role='user' + role="user", ) # Store OAuth tokens for MediaWiki API editing (template enforcement) user.oauth_token = access_token.key @@ -702,8 +834,8 @@ def oauth_callback(): access_token_jwt = create_access_token(identity=str(user.id)) # Clear OAuth session data - session.pop('request_token', None) - session.pop('request_secret', None) + session.pop("request_token", None) + session.pop("request_secret", None) # --- Determine Redirect URL --- # IMPORTANT: OAuth callback URL is fixed at http://localhost:5000/api/user/oauth/callback @@ -711,7 +843,7 @@ def oauth_callback(): # This ensures the Vue.js app can process the oauth_success parameter # Check for frontend URL in environment variable (for production) - frontend_url = current_app.config.get('FRONTEND_URL') + frontend_url = current_app.config.get("FRONTEND_URL") if frontend_url: # Production: use configured frontend URL @@ -719,7 +851,7 @@ def oauth_callback(): else: # Development: Always redirect to Vue.js dev server # This ensures the Vue.js app loads properly and can process OAuth callback - redirect_url = 'http://localhost:5173/?oauth_success=true' + redirect_url = "http://localhost:5173/?oauth_success=true" # --- Create Response with JWT Cookie --- # Create response with redirect to frontend @@ -737,18 +869,19 @@ def oauth_callback(): except Exception as error: # pylint: disable=broad-exception-caught # OAuth can fail in many ways, so we catch all exceptions - current_app.logger.error(f'OAuth callback error: {str(error)}') - return jsonify({ - 'error': 'OAuth authentication failed', - 'details': str(error) - }), 500 + current_app.logger.error(f"OAuth callback error: {str(error)}") + return ( + jsonify({"error": "OAuth authentication failed", "details": str(error)}), + 500, + ) # ------------------------------------------------------------------------ # USER SEARCH & LOOKUP # ------------------------------------------------------------------------ -@user_bp.route('/search', methods=['GET']) + +@user_bp.route("/search", methods=["GET"]) @handle_errors def search_users(): """ @@ -761,24 +894,25 @@ def search_users(): Returns: JSON response with list of matching usernames """ - query = request.args.get('q', '').strip() - limit = request.args.get('limit', 10, type=int) + query = request.args.get("q", "").strip() + limit = request.args.get("limit", 10, type=int) # Require at least 2 characters for search if not query or len(query) < 2: - return jsonify({'users': []}), 200 + return jsonify({"users": []}), 200 # Search users whose username starts with or contains the query - users = User.query.filter( - User.username.ilike(f'%{query}%') - ).limit(limit).all() + users = User.query.filter(User.username.ilike(f"%{query}%")).limit(limit).all() - return jsonify({ - 'users': [{'username': user.username, 'id': user.id} for user in users] - }), 200 + return ( + jsonify( + {"users": [{"username": user.username, "id": user.id} for user in users]} + ), + 200, + ) -@user_bp.route('//username', methods=['GET']) +@user_bp.route("//username", methods=["GET"]) @handle_errors def get_user_username(user_id): """ @@ -796,20 +930,18 @@ def get_user_username(user_id): user = User.query.get(user_id) if not user: - return jsonify({'error': 'User not found'}), 404 + return jsonify({"error": "User not found"}), 404 # Return only non-sensitive user information - return jsonify({ - 'id': user.id, - 'username': user.username - }), 200 + return jsonify({"id": user.id, "username": user.username}), 200 # ------------------------------------------------------------------------ # TRUSTED MEMBER MANAGEMENT ROUTES # ------------------------------------------------------------------------ -@user_bp.route('/trusted-members/request', methods=['POST']) + +@user_bp.route("/trusted-members/request", methods=["POST"]) @require_auth @handle_errors def request_trusted_member(): @@ -835,57 +967,83 @@ def request_trusted_member(): # Superadmins are automatically trusted members, no need to request if user.is_superadmin(): - return jsonify({ - 'error': 'Superadmins are automatically trusted members and can create contests directly' - }), 400 + return ( + jsonify( + { + "error": "Superadmins are automatically trusted members and can create contests directly" + } + ), + 400, + ) # Check if already a trusted member - if getattr(user, 'is_trusted_member', False): - return jsonify({ - 'error': 'You are already a trusted member' - }), 400 + if getattr(user, "is_trusted_member", False): + return jsonify({"error": "You are already a trusted member"}), 400 # Check if already requested - if getattr(user, 'trusted_member_request', False): - return jsonify({ - 'error': 'You have already requested trusted member status. Please wait for approval.' - }), 400 + if getattr(user, "trusted_member_request", False): + return ( + jsonify( + { + "error": "You have already requested trusted member status. Please wait for approval." + } + ), + 400, + ) # Check if user logged in via MediaWiki OAuth # Only MediaWiki OAuth users can request creator accounts if not user.oauth_token or not user.oauth_token_secret: - return jsonify({ - 'error': 'Only users who logged in via MediaWiki can request creator accounts. Please log in using MediaWiki OAuth.' - }), 400 + return ( + jsonify( + { + "error": "Only users who logged in via MediaWiki can request creator accounts. Please log in using MediaWiki OAuth." + } + ), + 400, + ) # Get MediaWiki URI from config - mw_uri = current_app.config.get('OAUTH_MWURI', 'https://meta.wikimedia.org/w/index.php') + mw_uri = current_app.config.get( + "OAUTH_MWURI", "https://meta.wikimedia.org/w/index.php" + ) # Get user's edit count from MediaWiki API from app.utils import get_mediawiki_user_edit_count + edit_count = get_mediawiki_user_edit_count(user.username, mw_uri) # If we couldn't fetch edit count, require reason (safer default) if edit_count is None: # Require reason if edit count cannot be determined data = request.get_json() or {} - reason = data.get('reason', '').strip() + reason = data.get("reason", "").strip() if not reason: - return jsonify({ - 'error': 'Could not verify your edit count. Please provide a reason for requesting creator account status.', - 'requires_reason': True - }), 400 + return ( + jsonify( + { + "error": "Could not verify your edit count. Please provide a reason for requesting creator account status.", + "requires_reason": True, + } + ), + 400, + ) # Store request with reason for superadmin review user.trusted_member_request = True user.trusted_member_request_reason = reason user.save() - return jsonify({ - 'message': 'Creator account request submitted successfully. A superadmin will review your request.', - 'auto_approved': False - }), 200 + return ( + jsonify( + { + "message": "Creator account request submitted successfully. A superadmin will review your request.", + "auto_approved": False, + } + ), + 200, + ) # Check edit count threshold (300 edits) MIN_EDIT_COUNT = 300 @@ -897,38 +1055,53 @@ def request_trusted_member(): user.trusted_member_request_reason = None # Clear any previous reason user.save() - return jsonify({ - 'message': f'Congratulations! You have {edit_count} edits. Your creator account has been automatically approved.', - 'auto_approved': True, - 'edit_count': edit_count - }), 200 + return ( + jsonify( + { + "message": f"Congratulations! You have {edit_count} edits. Your creator account has been automatically approved.", + "auto_approved": True, + "edit_count": edit_count, + } + ), + 200, + ) # User has < 300 edits: require reason for superadmin review data = request.get_json() or {} - reason = data.get('reason', '').strip() + reason = data.get("reason", "").strip() if not reason: - return jsonify({ - 'error': f'You have {edit_count} edits, which is below the minimum of {MIN_EDIT_COUNT} edits for automatic approval. Please provide a reason for requesting creator account status.', - 'requires_reason': True, - 'edit_count': edit_count, - 'min_edit_count': MIN_EDIT_COUNT - }), 400 + return ( + jsonify( + { + "error": f"You have {edit_count} edits, which is below the minimum of {MIN_EDIT_COUNT} edits for automatic approval. Please provide a reason for requesting creator account status.", + "requires_reason": True, + "edit_count": edit_count, + "min_edit_count": MIN_EDIT_COUNT, + } + ), + 400, + ) # Store request with reason for superadmin review user.trusted_member_request = True user.trusted_member_request_reason = reason user.save() - return jsonify({ - 'message': f'Your creator account request has been submitted for review. You have {edit_count} edits (minimum {MIN_EDIT_COUNT} for automatic approval). A superadmin will review your request.', - 'auto_approved': False, - 'edit_count': edit_count - }), 200 + return ( + jsonify( + { + "message": f"Your creator account request has been submitted for review. You have {edit_count} edits (minimum {MIN_EDIT_COUNT} for automatic approval). A superadmin will review your request.", + "auto_approved": False, + "edit_count": edit_count, + } + ), + 200, + ) -@user_bp.route('/trusted-members/requests', methods=['GET']) -@require_role('superadmin') +@user_bp.route("/trusted-members/requests", methods=["GET"]) +@require_role("superadmin") @handle_errors def get_trusted_member_requests(): """ @@ -940,23 +1113,39 @@ def get_trusted_member_requests(): JSON response with list of pending requests """ # Get all users with pending requests - requests = User.query.filter_by(trusted_member_request=True, is_trusted_member=False).all() + requests = User.query.filter_by( + trusted_member_request=True, is_trusted_member=False + ).all() - return jsonify({ - 'requests': [{ - 'id': user.id, - 'username': user.username, - 'email': user.email, - 'role': user.role, - 'created_at': user.created_at.isoformat() if user.created_at else None, - 'requested_at': user.created_at.isoformat() if user.created_at else None, # Using created_at as proxy - 'request_reason': getattr(user, 'trusted_member_request_reason', None) # Include reason for review - } for user in requests] - }), 200 + return ( + jsonify( + { + "requests": [ + { + "id": user.id, + "username": user.username, + "email": user.email, + "role": user.role, + "created_at": ( + user.created_at.isoformat() if user.created_at else None + ), + "requested_at": ( + user.created_at.isoformat() if user.created_at else None + ), # Using created_at as proxy + "request_reason": getattr( + user, "trusted_member_request_reason", None + ), # Include reason for review + } + for user in requests + ] + } + ), + 200, + ) -@user_bp.route('/trusted-members', methods=['GET']) -@require_role('superadmin') +@user_bp.route("/trusted-members", methods=["GET"]) +@require_role("superadmin") @handle_errors def get_trusted_members(): """ @@ -971,25 +1160,35 @@ def get_trusted_members(): trusted_members = User.query.filter_by(is_trusted_member=True).all() # Also include superadmins in the list - superadmins = User.query.filter_by(role='superadmin').all() + superadmins = User.query.filter_by(role="superadmin").all() # Combine and deduplicate all_trusted = {user.id: user for user in trusted_members + superadmins} - return jsonify({ - 'trusted_members': [{ - 'id': user.id, - 'username': user.username, - 'email': user.email, - 'role': user.role, - 'is_superadmin': user.is_superadmin(), - 'created_at': user.created_at.isoformat() if user.created_at else None - } for user in all_trusted.values()] - }), 200 + return ( + jsonify( + { + "trusted_members": [ + { + "id": user.id, + "username": user.username, + "email": user.email, + "role": user.role, + "is_superadmin": user.is_superadmin(), + "created_at": ( + user.created_at.isoformat() if user.created_at else None + ), + } + for user in all_trusted.values() + ] + } + ), + 200, + ) -@user_bp.route('/trusted-members//approve', methods=['POST']) -@require_role('superadmin') +@user_bp.route("/trusted-members//approve", methods=["POST"]) +@require_role("superadmin") @handle_errors def approve_trusted_member(user_id): """ @@ -1007,13 +1206,11 @@ def approve_trusted_member(user_id): user = User.query.get(user_id) if not user: - return jsonify({'error': 'User not found'}), 404 + return jsonify({"error": "User not found"}), 404 # Superadmins are automatically trusted, no need to approve if user.is_superadmin(): - return jsonify({ - 'error': 'Superadmins are automatically trusted members' - }), 400 + return jsonify({"error": "Superadmins are automatically trusted members"}), 400 # Approve the request user.is_trusted_member = True @@ -1021,13 +1218,16 @@ def approve_trusted_member(user_id): user.trusted_member_request_reason = None # Clear the reason user.save() - return jsonify({ - 'message': f'User {user.username} has been approved as a trusted member' - }), 200 + return ( + jsonify( + {"message": f"User {user.username} has been approved as a trusted member"} + ), + 200, + ) -@user_bp.route('/trusted-members//reject', methods=['POST']) -@require_role('superadmin') +@user_bp.route("/trusted-members//reject", methods=["POST"]) +@require_role("superadmin") @handle_errors def reject_trusted_member(user_id): """ @@ -1045,20 +1245,23 @@ def reject_trusted_member(user_id): user = User.query.get(user_id) if not user: - return jsonify({'error': 'User not found'}), 404 + return jsonify({"error": "User not found"}), 404 # Clear the request flag and reason (rejection) user.trusted_member_request = False user.trusted_member_request_reason = None user.save() - return jsonify({ - 'message': f'Trusted member request for {user.username} has been rejected' - }), 200 + return ( + jsonify( + {"message": f"Trusted member request for {user.username} has been rejected"} + ), + 200, + ) -@user_bp.route('/trusted-members//add', methods=['POST']) -@require_role('superadmin') +@user_bp.route("/trusted-members//add", methods=["POST"]) +@require_role("superadmin") @handle_errors def add_trusted_member(user_id): """ @@ -1076,13 +1279,11 @@ def add_trusted_member(user_id): user = User.query.get(user_id) if not user: - return jsonify({'error': 'User not found'}), 404 + return jsonify({"error": "User not found"}), 404 # Superadmins are automatically trusted, no need to add if user.is_superadmin(): - return jsonify({ - 'error': 'Superadmins are automatically trusted members' - }), 400 + return jsonify({"error": "Superadmins are automatically trusted members"}), 400 # Add as trusted member user.is_trusted_member = True @@ -1090,13 +1291,16 @@ def add_trusted_member(user_id): user.trusted_member_request_reason = None # Clear any reason user.save() - return jsonify({ - 'message': f'User {user.username} has been added as a trusted member' - }), 200 + return ( + jsonify( + {"message": f"User {user.username} has been added as a trusted member"} + ), + 200, + ) -@user_bp.route('/trusted-members//remove', methods=['POST']) -@require_role('superadmin') +@user_bp.route("/trusted-members//remove", methods=["POST"]) +@require_role("superadmin") @handle_errors def remove_trusted_member(user_id): """ @@ -1114,18 +1318,22 @@ def remove_trusted_member(user_id): user = User.query.get(user_id) if not user: - return jsonify({'error': 'User not found'}), 404 + return jsonify({"error": "User not found"}), 404 # Cannot remove superadmin status (they're automatically trusted) if user.is_superadmin(): - return jsonify({ - 'error': 'Cannot remove trusted member status from superadmin' - }), 400 + return ( + jsonify({"error": "Cannot remove trusted member status from superadmin"}), + 400, + ) # Remove trusted member status user.is_trusted_member = False user.save() - return jsonify({ - 'message': f'User {user.username} has been removed from trusted members' - }), 200 \ No newline at end of file + return ( + jsonify( + {"message": f"User {user.username} has been removed from trusted members"} + ), + 200, + )