Skip to content

Commit

Permalink
New way to build app
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Nov 21, 2024
1 parent 80250ef commit f8e09f4
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 65 deletions.
9 changes: 2 additions & 7 deletions examples/main_tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from examples import tasks
from fluid.scheduler import TaskScheduler
from fluid.utils import log
from fluid.scheduler.cli import TaskManagerCLI

task_manager = TaskScheduler()
task_manager.register_from_module(tasks)
task_manager_cli = task_manager.cli()
task_manager_cli = TaskManagerCLI("examples.tasks:task_app")


if __name__ == "__main__":
log.config(app_names=["fluid"])
task_manager_cli()
11 changes: 10 additions & 1 deletion examples/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@
from datetime import timedelta
from typing import cast

from fluid.scheduler import TaskRun, every, task
from fastapi import FastAPI

from fluid.scheduler import TaskRun, TaskScheduler, every, task
from fluid.scheduler.broker import RedisTaskBroker
from fluid.scheduler.endpoints import setup_fastapi


def task_app() -> FastAPI:
task_manager = TaskScheduler()
task_manager.register_from_dict(globals())
return setup_fastapi(task_manager)


@task
Expand Down
2 changes: 1 addition & 1 deletion fluid/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Reusable server side python modules"""

__version__ = "1.1.3"
__version__ = "1.2.0"
48 changes: 32 additions & 16 deletions fluid/scheduler/cli.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Callable

import click
import uvicorn
from fastapi import FastAPI
from rich.console import Console
from rich.table import Table
from uvicorn.importer import import_from_string

from fluid.tools_fastapi import serve_app

from .endpoints import setup_fastapi
from fluid.utils import log

if TYPE_CHECKING:
from .consumer import TaskManager
from .models import TaskRun


TaskManagerApp = FastAPI | Callable[..., Any] | str


class TaskManagerCLI(click.Group):
def __init__(
self, task_manager: TaskManager, *, app: FastAPI | None = None, **kwargs: Any
):
def __init__(self, task_manager_app: TaskManagerApp, **kwargs: Any):
kwargs.setdefault("commands", DEFAULT_COMMANDS)
super().__init__(**kwargs)
self.task_manager = task_manager
self.app = app
self.task_manager_app = task_manager_app


def ctx_task_manager(ctx: click.Context) -> TaskManager:
return ctx.parent.command.task_manager # type: ignore
def ctx_task_manager_app(ctx: click.Context) -> TaskManagerApp:
return ctx.parent.command.task_manager_app # type: ignore


def ctx_app(ctx: click.Context) -> FastAPI | None:
return ctx.parent.command.app # type: ignore
def ctx_app(ctx: click.Context) -> FastAPI:
app = ctx_task_manager_app(ctx) # type: ignore
if isinstance(app, str):
return import_from_string(app)()
elif isinstance(app, FastAPI):
return app
else:
return app()


def ctx_task_manager(ctx: click.Context) -> TaskManager:
return ctx_app(ctx).state.task_manager


@click.command()
Expand Down Expand Up @@ -98,9 +108,15 @@ def execute(ctx: click.Context, task: str, dry_run: bool) -> None:
@click.pass_context
def serve(ctx: click.Context, host: str, port: int, reload: bool) -> None:
"""Run the service."""
task_manager = ctx_task_manager(ctx)
app = setup_fastapi(task_manager, app=ctx_app(ctx))
serve_app(app, host, port, reload)
task_manager_app = ctx_task_manager_app(ctx)
uvicorn.run(
task_manager_app,
port=port,
host=host,
log_level="info",
reload=reload,
log_config=log.config(),
)


DEFAULT_COMMANDS = (ls, execute, serve)
Expand Down
24 changes: 7 additions & 17 deletions fluid/scheduler/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
TaskState,
)

try:
from .cli import TaskManagerCLI
except ImportError:
TaskManagerCLI = None # type: ignore[assignment,misc]


AsyncHandler = Callable[[TaskRun], Awaitable[None]]

logger = log.get_logger(__name__)
Expand Down Expand Up @@ -152,6 +146,13 @@ def register_from_module(self, module: Any) -> None:
if isinstance(obj := getattr(module, name), Task):
self.register_task(obj)

def register_from_dict(self, data: dict) -> None:
for name, obj in data.items():
if name.startswith("_"):
continue
if isinstance(obj, Task):
self.register_task(obj)

def register_async_handler(self, event: str, handler: AsyncHandler) -> None:
"""Register an async handler for a given event
Expand All @@ -165,17 +166,6 @@ def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
"""
return None

def cli(self, **kwargs: Any) -> Any:
"""Create the task manager command line interface"""
try:
from fluid.scheduler.cli import TaskManagerCLI
except ImportError:
raise ImportError(
"TaskManagerCLI is not available - "
"install with `pip install aio-fluid[cli]`"
) from None
return TaskManagerCLI(self, **kwargs)

async def _execute_and_exit(self, task: Task | str, **params: Any) -> TaskRun:
async with self:
return await self.execute(task, **params)
Expand Down
6 changes: 4 additions & 2 deletions fluid/scheduler/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ def setup_fastapi(
task_manager: TaskManager,
*,
app: FastAPI | None = None,
include_router: bool = True,
**kwargs: Any,
) -> FastAPI:
"""Setup the FastAPI app"""
"""Setup the FastAPI app and add the task manager to the state"""
app = app or FastAPI(**kwargs)
app.include_router(router, tags=["Tasks"])
if include_router:
app.include_router(router, tags=["Tasks"])
app.state.task_manager = task_manager
if isinstance(task_manager, Worker):
app_workers(app).add_workers(task_manager)
Expand Down
21 changes: 1 addition & 20 deletions fluid/tools_fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,17 @@
from __future__ import annotations

from typing import Any, Callable

import uvicorn
from fastapi import FastAPI

from fluid.utils import log
from fluid.utils.worker import Workers

from .service import FastapiAppWorkers


def app_workers(app: FastAPI) -> Workers:
"""Get workers from app state or create new workers."""
if workers := getattr(app.state, "workers", None):
return workers
else:
workers = FastapiAppWorkers.setup(app)
app.state.workers = workers
return workers


def serve_app(
app: FastAPI | Callable[..., Any] | str,
host: str,
port: int,
reload: bool = False,
) -> None:
uvicorn.run(
app,
port=port,
host=host,
log_level="info",
reload=reload,
log_config=log.config(),
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aio-fluid"
version = "1.1.3"
version = "1.2.0"
description = "Tools for backend python services"
license = "BSD"
authors = ["Luca <luca@quantmind.com>"]
Expand Down

0 comments on commit f8e09f4

Please sign in to comment.