Skip to content

Commit

Permalink
v.1.5.0: fastapi extension.
Browse files Browse the repository at this point in the history
  • Loading branch information
ALittleMoron committed May 16, 2024
1 parent 058a1b5 commit 7856ae4
Show file tree
Hide file tree
Showing 10 changed files with 1,366 additions and 165 deletions.
104 changes: 104 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,107 @@ class YourUnitOfWork(BaseAsyncUnitOfWork):

and this will cause no commit or other session manipulation (except session create for repositories
work).

## Extensions

v1.5.0 now provided extensions for other technologies like web-frameworks. Now only FastAPI is
supported.

### FastAPI

FastAPI extensions implements base classes for services and container, so you can work with your
code easier.

First of all You need to prepare all to work with plugin:

```python
from functools import cached_property

from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker

from sqlrepo import BaseSyncRepository

engine = create_engine("<your-db-url-here>")
Session = sessionmaker(engine)


class Base(DeclarativeBase): ...


class YourModel(Base):
# Your model definition
...


def get_session():
with Session() as session:
yield session


app = FastAPI()
```

then you should use plugin like this:

```python
# your prepared code below

from sqlrepo.ext.fastapi import add_container_overrides
add_container_overrides(app, get_session)
```

then you can implements containers and services like this:

```python
# your prepared code below

from pydantic import BaseModel, ConfigDict

from sqlrepo.ext.fastapi import BaseSyncContainer, BaseSyncService


class YourModelDetail(BaseModel):
model_config = ConfigDict(from_attributes=True)
...


class YourModelList(BaseModel):
model_config = ConfigDict(from_attributes=True)
...


class YourModelRepository(BaseSyncRepository[YourModel]):
def your_custom_repo_method(self) -> YourModel: ...


class YourModelService(BaseSyncService[YourModel, YourModelDetail, YourModelList]):
detail_schema = YourModelDetail
list_schema = YourModelList
not_found_message = "YourModel entity not found in database"
not_found_exception = HTTPException

def init_repositories(self, session: "Session") -> None:
self.your_model_repo = YourModelRepository(session)

def your_custom_service_method(self) -> YourModelDetail:
return self.resolve(self.your_model_repo.your_custom_repo_method())


class Container(BaseSyncContainer):

@cached_property
def your_model_service(self):
return YourModelService(self.session, self.request)
```

and finally you can use Container in your routes like this:

```python
# your prepared code below

@app.get("/", response_model=YourModelDetail)
def get_your_model(container: Container = Depends()):
return container.your_model_service.your_custom_service_method()
```
846 changes: 685 additions & 161 deletions pdm.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,19 @@ dev = [

[project]
name = "sqlrepo"
version = "1.4.2"
version = "1.5.0"
description = "sqlalchemy repositories with crud operations and other utils for it."
authors = [{ name = "Dmitriy Lunev", email = "dima.lunev14@gmail.com" }]
requires-python = ">=3.11"
readme = "README.md"
license = { text = "MIT" }
dependencies = [
"sqlalchemy>=2.0.29",
"python-dev-utils[sqlalchemy_filters]>=1.8.3",
"python-dev-utils[sqlalchemy_filters]>=1.11.0",
]

[project.optional-dependencies]
fastapi = ["fastapi>=0.100"]

[build-system]
requires = ["pdm-backend"]
Expand Down
Empty file added sqlrepo/ext/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions sqlrepo/ext/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .containers import BaseAsyncContainer as BaseAsyncContainer
from .containers import BaseSyncContainer as BaseSyncContainer
from .containers import add_container_overrides as add_container_overrides
from .services import BaseAsyncService as BaseAsyncService
from .services import BaseService as BaseService
from .services import BaseSyncService as BaseSyncService
60 changes: 60 additions & 0 deletions sqlrepo/ext/fastapi/containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import TYPE_CHECKING, Protocol

from fastapi import Depends, Request

if TYPE_CHECKING:
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.session import Session

class SyncSessionDependsProtocol(Protocol):
"""Sync session depends protocol for FastAPI framework."""

@staticmethod
def __call__() -> Session: ... # noqa: D102

class AsyncSessionDependsProtocol(Protocol):
"""Async session depends protocol for FastAPI framework."""

@staticmethod
async def __call__() -> AsyncSession: ... # noqa: D102


def _get_session_stub() -> None:
"""Stub function, that will be overridden by main plug functions."""


def add_container_overrides(
app: "FastAPI",
session_depends: "SyncSessionDependsProtocol | AsyncSessionDependsProtocol",
) -> "FastAPI":
"""Container plugin function.
Add dependency override for user-defined SQLAlchemy session (sync or async) and return app back.
"""
app.dependency_overrides[_get_session_stub] = session_depends
return app


class BaseSyncContainer:
"""Base container class with sync interface."""

def __init__( # pragma: no coverage
self,
request: Request,
session: "Session" = Depends(_get_session_stub),
) -> None:
self.request = request
self.session = session


class BaseAsyncContainer:
"""Base container class with async interface."""

def __init__( # pragma: no coverage
self,
request: Request,
session: "AsyncSession" = Depends(_get_session_stub),
) -> None:
self.request = request
self.session = session
8 changes: 8 additions & 0 deletions sqlrepo/ext/fastapi/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class NotSetType:
"""Class, that represents not set attributes."""

def __bool__(self) -> bool: # noqa: D105 # pragma: no coverage
return False


NotSet = NotSetType()
191 changes: 191 additions & 0 deletions sqlrepo/ext/fastapi/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import importlib
import warnings
from inspect import isclass
from typing import TYPE_CHECKING, Any, ForwardRef, Generic, TypeVar, get_args

from dev_utils.verbose_http_exceptions import BaseVerboseHTTPException
from fastapi import HTTPException, status
from pydantic import BaseModel, TypeAdapter
from sqlalchemy.orm.decl_api import DeclarativeBase

from sqlrepo.ext.fastapi.helpers import NotSet, NotSetType
from sqlrepo.logging import logger

if TYPE_CHECKING:
from collections.abc import Sequence

from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.session import Session


TModel = TypeVar("TModel", bound=DeclarativeBase)
TDetailSchema = TypeVar("TDetailSchema", bound=BaseModel)
VListSchema = TypeVar("VListSchema", bound=BaseModel)


def resolve_type(
cls: type['BaseService[TModel, TDetailSchema, VListSchema]'],
value: Any, # noqa: ANN401
) -> Any | None: # noqa: ANN401
"""Resolve given generic type of BaseService to real type."""
if isinstance(value, ForwardRef):
try:
module = vars(cls).get("__module__")
if not module: # pragma: no coverage
msg = (
f"No attribute __module__ in {cls}. Can't import global context for "
"ForwardRef resolving."
)
raise TypeError(msg) # noqa: TRY301
detail_schema_globals = vars(importlib.import_module(module))
return eval( # noqa: S307
value.__forward_arg__,
detail_schema_globals,
)
except Exception as exc:
msg = (
"Can't evaluate ForwardRef of generic type. "
"Don't use type in generic with quotes. "
f"Original exception: {str(exc)}"
)
warnings.warn(msg, ServiceClassIncorrectUseWarning, stacklevel=2)
return
elif isinstance(value, TypeVar):
msg = "GenericType was not passed for pydantic BaseModel subclass."
warnings.warn(msg, ServiceClassIncorrectUseWarning, stacklevel=2)
return
elif not issubclass(value, BaseModel):
msg = "Passed GenericType is not pydantic BaseModel subclass."
warnings.warn(msg, ServiceClassIncorrectUseWarning, stacklevel=2)
return
return value


class ServiceClassIncorrectUseWarning(Warning):
"""Service class incorrect use warning."""


class BaseService(Generic[TModel, TDetailSchema, VListSchema]):
"""Base service class."""

__inheritance_check_model_class__: bool = True

def __init_subclass__(cls) -> None: # noqa: D105
super().__init_subclass__()
if not isinstance(
cls.detail_schema, # type: ignore
NotSetType,
) and not isinstance(
cls.list_schema, # type: ignore
NotSetType,
):
msg = "All needed attributes are set and nothing to do."
logger.debug(msg)
return
if cls.__inheritance_check_model_class__ is False:
cls.__inheritance_check_model_class__ = True
msg = f"Skip all generic type checking in {cls.__name__}."
logger.debug(msg)
return
try:
# PEP-560: https://peps.python.org/pep-0560/
# NOTE: this code is needed for getting type from generic: Generic[int] -> int type
# get_args get params from __orig_bases__, that contains Generic passed types.
_, detail_schema_type, list_schema_type, *_ = get_args(cls.__orig_bases__[0]) # type: ignore
except Exception as exc: # pragma: no coverage
msg = (
f"Error during getting information about Generic types for {cls.__name__}. "
f"Original exception: {str(exc)}"
)
warnings.warn(msg, ServiceClassIncorrectUseWarning, stacklevel=2)
return
if (
isinstance(getattr(cls, "detail_schema", NotSet), NotSetType)
and (detail_schema_type := resolve_type(cls, detail_schema_type)) is not None
):
cls.detail_schema = detail_schema_type # type: ignore
if (
isinstance(getattr(cls, "list_schema", NotSet), NotSetType)
and (list_schema_type := resolve_type(cls, list_schema_type)) is not None
):
cls.list_schema = list_schema_type # type: ignore

detail_schema: "type[TDetailSchema] | NotSetType" = NotSet
list_schema: "type[VListSchema]| NotSetType" = NotSet
not_found_message: "str | NotSetType" = NotSet
not_found_exception: "Exception | type[Exception] | NotSetType " = NotSet

def _resolve_entity_not_found(self) -> None:
message = "Entity not found."
if not isinstance(self.not_found_message, NotSetType):
message = self.not_found_message
if isinstance(self.not_found_exception, NotSetType):
msg = "not_found_exception must be set, if you use resolve_entity in your code."
raise AttributeError(msg) # noqa: TRY004
if not isclass(self.not_found_exception):
raise self.not_found_exception
if issubclass(self.not_found_exception, HTTPException):
raise self.not_found_exception(
detail=message,
status_code=status.HTTP_404_NOT_FOUND,
)
if issubclass(self.not_found_exception, BaseVerboseHTTPException):
message = self.not_found_exception.message or message
raise self.not_found_exception(
message=message,
status_code=status.HTTP_404_NOT_FOUND,
)
raise self.not_found_exception(message)

def resolve_entity(self, entity: "TModel | None") -> "TDetailSchema": # noqa: ANN401
"""Resolve given SQLAlchemy entity and return pydantic schema."""
if entity is None:
self._resolve_entity_not_found()
if isinstance(self.detail_schema, NotSetType):
msg = "detail_schema must be set, if you use resolve_entity in your code."
raise AttributeError(msg) # noqa: TRY004
return self.detail_schema.model_validate(entity, from_attributes=True)

def resolve_entity_list(self, entities: "Sequence[TModel]") -> "list[VListSchema]":
"""Resolve given SQLAlchemy entity and return pydantic schema."""
if isinstance(self.list_schema, NotSetType):
msg = "list_schema must be set, if you use resolve_entity in your code."
raise AttributeError(msg) # noqa: TRY004
return TypeAdapter(list[self.list_schema]).validate_python(entities, from_attributes=True)


class BaseAsyncService(BaseService[TModel, TDetailSchema, VListSchema]):
"""Base service with async interface."""

__inheritance_check_model_class__: bool = False

def init_repositories(self, session: "AsyncSession") -> None:
"""Init repositories.
Define your own method for it and specify your own methods for working with repositories.
"""
raise NotImplementedError()

def __init__(self, session: "AsyncSession", request: "Request") -> None: # pragma: no coverage
self.session = session
self.request = request
self.init_repositories(session)


class BaseSyncService(BaseService[TModel, TDetailSchema, VListSchema]):
"""Base service with async interface."""

__inheritance_check_model_class__: bool = False

def init_repositories(self, session: "Session") -> None:
"""Init repositories.
Define your own method for it and specify your own methods for working with repositories.
"""
raise NotImplementedError()

def __init__(self, session: "Session", request: "Request") -> None: # pragma: no coverage
self.session = session
self.request = request
self.init_repositories(session)
Loading

0 comments on commit 7856ae4

Please sign in to comment.