-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skeleton OAuth implementation #55
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,4 +21,20 @@ class Config: | |
MONGO_TOKEN = os.getenv("MONGO_TOKEN") | ||
PEXELS_TOKEN = os.getenv("PEXELS_TOKEN") | ||
DB_ENV = get_db_env() | ||
AUTH_PROVIDERS = { | ||
"google": { | ||
"client_id": os.getenv("GOOGLE_CLIENT_ID"), | ||
"client_secret": os.getenv("GOOGLE_CLIENT_SECRET"), | ||
"authorize_url": "https://accounts.google.com/o/oauth2/auth", | ||
"token_url": "https://accounts.google.com/o/oauth2/token", | ||
"user_info": { | ||
"url": "https://www.googleapis.com/oauth2/v3/userinfo", | ||
"data": lambda json: {"email": json["email"], "id": json["sub"]}, | ||
}, | ||
"scopes": ["https://www.googleapis.com/auth/userinfo.email"], | ||
} | ||
} | ||
BASE_URL = ( | ||
"https://playimaginate.com" if DB_ENV == "prod" else "http://localhost:5173" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed the local host base url to the client localhost instead of the API localhost |
||
) | ||
TESTING = False |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
from bson.objectid import ObjectId | ||
from flask_login import UserMixin | ||
from imaginate_api.extensions import login_manager | ||
from imaginate_api.extensions import db | ||
|
||
# Specification: https://flask-login.readthedocs.io/en/latest/# | ||
COLLECTION_NAME = "users" | ||
COLLECTION = db[COLLECTION_NAME] | ||
|
||
|
||
class User(UserMixin): | ||
def __init__(self, user_data=None): | ||
self.user_data = user_data or {} | ||
|
||
@property | ||
def is_authenticated(self): | ||
return self.user_data.get("authenticated", False) | ||
|
||
@property | ||
def is_active(self): | ||
return self.user_data.get("active", False) | ||
|
||
@property | ||
def is_anonymous(self): | ||
return False # Always return False based on specification | ||
|
||
def get_id(self): | ||
return str(self.user_data["_id"]) | ||
|
||
def get_clientside_data(self): | ||
return { | ||
"email": self.user_data.get("email"), | ||
} | ||
Comment on lines
+30
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this to get data that needs to be sent to front end |
||
|
||
def authenticate_user(self): | ||
COLLECTION.update_one( | ||
{"_id": self.user_data["_id"]}, {"$set": {"authenticated": True}} | ||
) | ||
self.user_data["authenticated"] = True | ||
|
||
def deactivate_user(self): | ||
COLLECTION.update_one({"_id": self.user_data["_id"]}, {"$set": {"active": False}}) | ||
self.user_data["active"] = False | ||
|
||
@classmethod | ||
def find_or_create_user(cls, data, provider=None): | ||
# Primary identifier: Try to find the existing user by using the unique ID from the provider | ||
if provider: | ||
existing_user = COLLECTION.find_one({f"{provider}_id": data["id"]}) | ||
if existing_user: | ||
return User(user_data=existing_user) | ||
|
||
# Secondary identifier: Try to find the existing user by using the email from the provider | ||
existing_user = COLLECTION.find_one({"email": data["email"]}) | ||
if existing_user: | ||
if provider: | ||
COLLECTION.update_one( | ||
{"_id": existing_user["_id"]}, {"$set": {f"{provider}_id": data["id"]}} | ||
) | ||
return User(user_data=existing_user) | ||
|
||
# If no user is found, create a new one | ||
data["authenticated"] = False | ||
data["active"] = True | ||
data[f"{provider}_id"] = data.pop("id") | ||
new_user = COLLECTION.insert_one(data) | ||
return User.get(new_user.inserted_id) | ||
|
||
# Get user by ID | ||
@classmethod | ||
def get(cls, user_id): | ||
user = COLLECTION.find_one({"_id": ObjectId(user_id)}) | ||
if not user: | ||
return None | ||
return cls(user_data=user) | ||
|
||
|
||
# Callback function for Flask login library to load user from session user_id | ||
@login_manager.user_loader | ||
def load_user(user_id): | ||
return User.get(user_id) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
from flask import Blueprint, abort, request, redirect, url_for, session, current_app | ||
from flask_login import current_user, login_user | ||
from imaginate_api.schemas.user_info import User | ||
from http import HTTPStatus | ||
from urllib.parse import urlencode | ||
import secrets | ||
import requests | ||
|
||
bp = Blueprint("user", __name__) | ||
|
||
|
||
# Initiates the authorization process with the specified provider | ||
@bp.route("/authorize/<provider>") | ||
def user_authorize(provider): | ||
if not current_user.is_anonymous: | ||
return redirect(f'{current_app.config["BASE_URL"]}/?{urlencode(current_user.get_clientside_data())}') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed redirect here to include client-side data |
||
|
||
provider_data = current_app.config["AUTH_PROVIDERS"].get(provider) | ||
if not provider_data: | ||
abort( | ||
HTTPStatus.NOT_FOUND, | ||
description=f"Invalid provider, supports: {list(current_app.config["AUTH_PROVIDERS"].keys())}", | ||
) | ||
|
||
session["oauth_state"] = secrets.token_urlsafe(32) | ||
query = urlencode( | ||
{ | ||
"client_id": provider_data["client_id"], | ||
"redirect_uri": url_for("user.user_callback", provider=provider, _external=True), | ||
"response_type": "code", # This tells the OAuth provider that we expect an authorization code to be returned | ||
"scope": " ".join(provider_data["scopes"]), | ||
"state": session["oauth_state"], | ||
} | ||
) | ||
|
||
return redirect(f"{provider_data["authorize_url"]}?{query}") | ||
|
||
|
||
# Handles the callback (i.e. redirection response) process with the specified provider | ||
@bp.route("/callback/<provider>") | ||
def user_callback(provider): | ||
if not current_user.is_anonymous: | ||
return redirect(current_app.config["BASE_URL"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @faisal-fawad do you think it would make sense to update this redirect like the others? |
||
|
||
provider_data = current_app.config["AUTH_PROVIDERS"].get(provider) | ||
if not provider_data: | ||
abort( | ||
HTTPStatus.NOT_FOUND, | ||
description=f"Invalid provider, supports: {list(current_app.config["AUTH_PROVIDERS"].keys())}", | ||
) | ||
|
||
# Unable to authenticate with the specified provider | ||
if "error" in request.args: | ||
for k, v in request.args.items(): | ||
if k.startswith("error"): | ||
print(f"{k}: {v}") # Debug any errors by printing them | ||
abort(HTTPStatus.BAD_REQUEST, description="Authentication error") | ||
|
||
# Authorization does not match the specification we have set | ||
if request.args["state"] != session.get("oauth_state") or "code" not in request.args: | ||
abort(HTTPStatus.BAD_REQUEST, description="Authorization error") | ||
|
||
# Get an access token from the authorization code | ||
response = requests.post( | ||
provider_data["token_url"], | ||
data={ | ||
"client_id": provider_data["client_id"], | ||
"client_secret": provider_data["client_secret"], | ||
"code": request.args["code"], | ||
"grant_type": "authorization_code", | ||
"redirect_uri": url_for("user.user_callback", provider=provider, _external=True), | ||
}, | ||
headers={"Accept": "application/json"}, | ||
) | ||
if not response.ok: | ||
abort(response.status_code, description="Authorization error") | ||
response_data = response.json() | ||
token = response_data.get("access_token") | ||
if not token: | ||
abort(HTTPStatus.UNAUTHORIZED, description="Authorization error") | ||
|
||
# Get the requested data | ||
response = requests.get( | ||
provider_data["user_info"]["url"], | ||
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"}, | ||
) | ||
if not response.ok: | ||
abort(response.status_code, description="Authorization error") | ||
|
||
# Login user and map requested data | ||
user_data = provider_data["user_info"]["data"](response.json()) | ||
user = User.find_or_create_user(user_data, provider) | ||
success = login_user(user) | ||
if success: | ||
user.authenticate_user() | ||
return redirect(f'{current_app.config["BASE_URL"]}/?{urlencode(user.get_clientside_data())}') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed the redirect here to include needed client-side data |
||
|
||
return redirect(current_app.config["BASE_URL"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added cors for ez testing. Does not matter when deployed. ( I think )