Skip to content

support google login #13

@cubxxw

Description

@cubxxw

扩展登录模块以支持Google登录及其他登录方式

基于对现有代码的分析,我将提出一个可扩展的登录模块设计,支持Google登录以及未来可能添加的其他登录方式。

当前系统分析

目前的系统使用基于密码的认证方式:

  • 用户通过电子邮件和密码登录 login.py:24-43
  • 使用JWT令牌进行身份验证 security.py:15-19
  • **authenticate**函数验证邮箱和密码 crud.py:40-46

扩展设计方案

1. 创建认证提供者接口

首先,创建一个抽象的认证提供者接口,作为所有认证方式的基础:

# /backend/app/auth/providers/base.pyfrom abcimport ABC, abstractmethod
from typingimportOptional,Dict,Any

from app.modelsimport User

classAuthProvider(ABC):
    """认证提供者的抽象基类"""

    @property    @abstractmethoddefprovider_id(self) -> str:
        """提供者唯一标识"""
pass

    @abstractmethodasyncdefauthenticate(self, **credentials) ->Optional[User]:
        """根据提供的凭证进行认证,返回用户或None"""
pass

    @abstractmethoddefget_auth_url(self) -> str:
        """获取认证URL(用于OAuth流程)"""
pass

    @abstractmethodasyncdefhandle_callback(self, **params) ->Optional[User]:
        """处理OAuth回调"""
pass

2. 实现密码认证提供者

将现有的密码认证逻辑封装到一个提供者中:

# /backend/app/auth/providers/password.pyfrom typingimportOptionalfrom sqlmodelimport Session

from app.auth.providers.baseimport AuthProvider
from app.core.securityimport verify_password
from app.modelsimport User

classPasswordAuthProvider(AuthProvider):
def__init__(self, db: Session):
        self.db = db

    @propertydefprovider_id(self) -> str:
return "password"

asyncdefauthenticate(self, **credentials) ->Optional[User]:
        email = credentials.get("email")
        password = credentials.get("password")

ifnot emailornot password:
return None

        user = self.db.exec(select(User).where(User.email == email)).first()
ifnot user:
return None

ifnot verify_password(password, user.hashed_password):
return None

return user

defget_auth_url(self) -> str:
# 密码认证不需要外部URLreturn ""

asyncdefhandle_callback(self, **params) ->Optional[User]:
# 密码认证不需要回调处理return None

3. 实现Google认证提供者

# /backend/app/auth/providers/google.pyimport uuid
from typingimportOptional,Dict,Anyimport httpx
from sqlmodelimport Session, select

from app.auth.providers.baseimport AuthProvider
from app.core.configimport settings
from app.modelsimport User, UserCreate
from app.crudimport create_user

classGoogleAuthProvider(AuthProvider):
def__init__(self, db: Session):
        self.db = db
        self.client_id = settings.GOOGLE_CLIENT_ID
        self.client_secret = settings.GOOGLE_CLIENT_SECRET
        self.redirect_uri = f"{settings.BACKEND_HOST}/api/v1/auth/callback/google"

    @propertydefprovider_id(self) -> str:
return "google"

asyncdefauthenticate(self, **credentials) ->Optional[User]:
# Google认证通过handle_callback完成return None

defget_auth_url(self) -> str:
        """生成Google OAuth认证URL"""
        scope = "email profile"
return (
            f"https://accounts.google.com/o/oauth2/auth"
            f"?client_id={self.client_id}"
            f"&redirect_uri={self.redirect_uri}"
            f"&scope={scope}"
            f"&response_type=code"
        )

asyncdefhandle_callback(self, **params) ->Optional[User]:
        """处理Google OAuth回调"""
        code = params.get("code")
ifnot code:
return None

# 交换code获取access_token
        token_url = "https://oauth2.googleapis.com/token"
        token_data = {
            "code": code,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "redirect_uri": self.redirect_uri,
            "grant_type": "authorization_code"
        }

asyncwith httpx.AsyncClient()as client:
            token_response =await client.post(token_url, data=token_data)
if token_response.status_code != 200:
return None

            token_info = token_response.json()
            access_token = token_info.get("access_token")

# 使用access_token获取用户信息
            user_info_url = "https://www.googleapis.com/oauth2/v2/userinfo"
            headers = {"Authorization": f"Bearer {access_token}"}
            user_response =await client.get(user_info_url, headers=headers)

if user_response.status_code != 200:
return None

            user_info = user_response.json()

# 检查用户是否已存在
            email = user_info.get("email")
            user = self.db.exec(select(User).where(User.email == email)).first()

if user:
# 更新用户的Google信息
                self._update_user_oauth_info(user, "google", user_info)
return user

# 创建新用户return self._create_new_user(user_info)

def_update_user_oauth_info(self, user: User, provider: str, user_info:Dict[str,Any]) -> None:
        """更新用户的OAuth提供者信息"""
# 需要扩展User模型以存储OAuth提供者信息pass

def_create_new_user(self, user_info:Dict[str,Any]) -> User:
        """基于Google用户信息创建新用户"""
        email = user_info.get("email")
        name = user_info.get("name")

# 为新用户生成随机密码
        random_password = uuid.uuid4().hex

        user_in = UserCreate(
            email=email,
            password=random_password,# 用户将无法使用此密码登录,除非重置
            full_name=name,
            is_active=True
        )

        user = create_user(session=self.db, user_create=user_in)
        self._update_user_oauth_info(user, "google", user_info)
return user

4. 扩展用户模型以支持OAuth

需要扩展用户模型以存储OAuth提供者信息:

# 在 /backend/app/models.py 中添加import json
from typingimportDict,Any

classUser(UserBase, table=True):
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    hashed_password: str
    items: list["Item"] = Relationship(back_populates="owner", cascade_delete=True)
# 新增字段存储OAuth信息
    oauth_providers: str = Field(default="{}", description="存储OAuth提供者信息的JSON字符串")

defget_oauth_info(self, provider: str) ->Dict[str,Any]:
        """获取指定提供者的OAuth信息"""
        providers = json.loads(self.oauth_providers)
return providers.get(provider, {})

defset_oauth_info(self, provider: str, info:Dict[str,Any]) -> None:
        """设置指定提供者的OAuth信息"""
        providers = json.loads(self.oauth_providers)
        providers[provider] = info
        self.oauth_providers = json.dumps(providers)

5. 创建认证管理器

# /backend/app/auth/manager.pyfrom typingimportDict,List,Optional,Typefrom fastapiimport Depends, HTTPException, status
from sqlmodelimport Session

from app.auth.providers.baseimport AuthProvider
from app.auth.providers.passwordimport PasswordAuthProvider
from app.auth.providers.googleimport GoogleAuthProvider
from app.api.depsimport get_db
from app.core.securityimport create_access_token
from app.core.configimport settings
from app.modelsimport User, Token
from datetimeimport timedelta

classAuthManager:
    """认证管理器,管理所有认证提供者"""

def__init__(self, db: Session):
        self.db = db
        self.providers:Dict[str, AuthProvider] = {}
        self._register_default_providers()

def_register_default_providers(self) -> None:
        """注册默认的认证提供者"""
        self.register_provider(PasswordAuthProvider(self.db))
        self.register_provider(GoogleAuthProvider(self.db))

defregister_provider(self, provider: AuthProvider) -> None:
        """注册一个认证提供者"""
        self.providers[provider.provider_id] = provider

defget_provider(self, provider_id: str) ->Optional[AuthProvider]:
        """获取指定ID的认证提供者"""
return self.providers.get(provider_id)

defget_all_providers(self) ->List[str]:
        """获取所有认证提供者的ID"""
return list(self.providers.keys())

asyncdefauthenticate(self, provider_id: str, **credentials) ->Optional[User]:
        """使用指定提供者进行认证"""
        provider = self.get_provider(provider_id)
ifnot provider:
return None

returnawait provider.authenticate(**credentials)

defget_auth_url(self, provider_id: str) -> str:
        """获取指定提供者的认证URL"""
        provider = self.get_provider(provider_id)
ifnot provider:
return ""

return provider.get_auth_url()

asyncdefhandle_callback(self, provider_id: str, **params) ->Optional[User]:
        """处理指定提供者的回调"""
        provider = self.get_provider(provider_id)
ifnot provider:
return None

returnawait provider.handle_callback(**params)

defcreate_token(self, user: User) -> Token:
        """为用户创建访问令牌"""
        access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = create_access_token(
            user.id, expires_delta=access_token_expires
        )
return Token(access_token=access_token)

# FastAPI依赖项defget_auth_manager(db: Session = Depends(get_db)) -> AuthManager:
return AuthManager(db)

6. 创建新的认证路由

# /backend/app/api/routes/auth.pyfrom typingimportDict,Listfrom fastapiimport APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.responsesimport RedirectResponse

from app.auth.managerimport AuthManager, get_auth_manager
from app.core.configimport settings
from app.modelsimport Token, UserPublic

router = APIRouter(prefix="/auth", tags=["auth"])

@router.get("/providers")asyncdeflist_auth_providers(auth_manager: AuthManager = Depends(get_auth_manager)) ->List[str]:
    """列出所有可用的认证提供者"""
return auth_manager.get_all_providers()

@router.post("/login/{provider}")asyncdeflogin(
    provider: str,
    credentials:Dict[str, str],
    auth_manager: AuthManager = Depends(get_auth_manager)
) -> Token:
    """使用指定提供者进行登录"""
    user =await auth_manager.authenticate(provider, **credentials)
ifnot user:
raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="认证失败",
        )

return auth_manager.create_token(user)

@router.get("/oauth/{provider}")asyncdefoauth_login(
    provider: str,
    auth_manager: AuthManager = Depends(get_auth_manager)
) -> RedirectResponse:
    """开始OAuth认证流程"""
    auth_url = auth_manager.get_auth_url(provider)
ifnot auth_url:
raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"提供者 {provider} 不存在或不支持OAuth",
        )

return RedirectResponse(url=auth_url)

@router.get("/callback/{provider}")asyncdefoauth_callback(
    provider: str,
    request: Request,
    auth_manager: AuthManager = Depends(get_auth_manager)
) -> RedirectResponse:
    """处理OAuth回调"""
    params = dict(request.query_params)

    user =await auth_manager.handle_callback(provider, **params)
ifnot user:
# 认证失败,重定向到前端失败页面return RedirectResponse(url=f"{settings.FRONTEND_HOST}/auth/failed")

# 生成令牌
    token = auth_manager.create_token(user)

# 重定向到前端,带上令牌return RedirectResponse(
        url=f"{settings.FRONTEND_HOST}/auth/success?token={token.access_token}"
    )

7. 更新配置以支持OAuth提供者

在 /backend/app/core/config.py 中添加Google OAuth配置:

classSettings(BaseSettings):
# ... 现有配置 ...

# Google OAuth配置
    GOOGLE_CLIENT_ID: str = ""
    GOOGLE_CLIENT_SECRET: str = ""

# 后端主机地址,用于构建回调URL
    BACKEND_HOST: str = "http://localhost:8000"

8. 更新依赖注入

在 /backend/app/api/deps.py 中添加获取认证管理器的依赖:

from app.auth.managerimport AuthManager

# ... 现有代码 ...

defget_auth_manager(session: SessionDep) -> AuthManager:
return AuthManager(session)

9. 注册新路由

在 /backend/app/api/main.py 中注册新的认证路由:

from app.api.routesimport auth

# ... 现有代码 ...

api_router.include_router(auth.router, prefix=settings.API_V1_STR)

总结

这个设计方案提供了一个可扩展的认证系统,不仅支持Google登录,还可以轻松添加其他第三方认证提供者。主要特点包括:

  1. 抽象的认证提供者接口,使得添加新的认证方式变得简单
  2. 认证管理器作为中央控制点,管理所有认证提供者
  3. 扩展的用户模型,支持存储OAuth提供者信息
  4. 统一的令牌生成和验证机制
  5. 兼容现有的密码认证方式

要添加新的认证提供者,只需要:

  1. 实现 AuthProvider 接口
  2. 在 AuthManager 中注册新提供者
  3. 添加必要的配置参数

这种设计遵循了开放/封闭原则,让系统对扩展开放,对修改封闭。

Notes

这个设计方案基于现有的FastAPI认证系统,保留了JWT令牌认证的核心机制,同时通过提供者模式扩展了认证方式。

实现时需要注意以下几点:

  1. 需要添加migrations脚本以更新用户表结构,添加OAuth信息存储字段
  2. 需要注册Google OAuth应用并获取客户端ID和密钥
  3. 前端需要实现相应的OAuth登录流程和回调处理
  4. 可能需要根据实际需求调整OAuth信息的存储方式,如使用单独的表等

Metadata

Metadata

Assignees

Labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions