Skip to content

Commit 0fa96c7

Browse files
committed
Add api
1 parent d106203 commit 0fa96c7

9 files changed

+218
-4
lines changed

Makefile

+6
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,9 @@ publish: ## release to pypi and github tag
4343

4444
outdated: ## Show outdated packages
4545
poetry show -o -a
46+
47+
48+
49+
.PHONY: example
50+
example: ## run task scheduler example
51+
@poetry run python -m examples.all_features

examples/__init__.py

Whitespace-only changes.

examples/all_features.py

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import uvicorn
2+
from fluid.scheduler import TaskScheduler
3+
from fluid.tasks_api import task_manager_routes
4+
from fastapi import FastAPI
5+
from . import tasks
6+
7+
8+
def main():
9+
task_manager = TaskScheduler()
10+
task_manager.register_from_module(tasks)
11+
app = FastAPI()
12+
task_manager_routes(app, task_manager)
13+
uvicorn.run(app)
14+
15+
16+
if __name__ == "__main__":
17+
main()

examples/tasks.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import asyncio
2+
import os
3+
from datetime import timedelta
4+
from typing import cast
5+
6+
from fluid.scheduler import TaskRun, every, task
7+
8+
9+
@task
10+
async def dummy(context: TaskRun) -> float:
11+
sleep = cast(float, context.params.get("sleep", 0.1))
12+
await asyncio.sleep(sleep)
13+
if context.params.get("error"):
14+
raise RuntimeError("just an error")
15+
return sleep
16+
17+
18+
@task(schedule=every(timedelta(seconds=1)))
19+
async def scheduled(context: TaskRun) -> str:
20+
"""A simple scheduled task"""
21+
await asyncio.sleep(0.1)
22+
return "OK"
23+
24+
25+
@task
26+
async def disabled(context: TaskRun) -> float:
27+
sleep = cast(float, context.params.get("sleep", 0.1))
28+
await asyncio.sleep(sleep)
29+
return sleep
30+
31+
32+
@task(cpu_bound=True)
33+
async def cpu_bound(context: TaskRun) -> int:
34+
await asyncio.sleep(0.1)
35+
redis = context.task_manager.broker.redis_cli
36+
await redis.setex(context.run_id, os.getpid(), 10)
37+
return 0

fluid/tasks_api/__init__.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from fastapi import FastAPI
2+
3+
from fluid.scheduler import TaskManager
4+
from fluid.utils.worker import Workers
5+
6+
from .endpoints import router
7+
from .service import FastapiAppWorkers
8+
9+
10+
def app_workers(app: FastAPI) -> Workers:
11+
if workers := getattr(app.state, "workers", None):
12+
return workers
13+
else:
14+
workers = FastapiAppWorkers.setup(app)
15+
app.state.workers = workers
16+
return workers
17+
18+
19+
def task_manager_routes(app: FastAPI, task_manager: TaskManager) -> None:
20+
app.state.task_manager = task_manager
21+
app.include_router(router, tags=["Tasks"])
22+
app_workers(app).add_workers(task_manager)

fluid/tasks_api/endpoints.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from typing import Annotated
2+
3+
from fastapi import APIRouter, Depends, HTTPException, Path, Request
4+
from pydantic import BaseModel, Field
5+
6+
from fluid.scheduler import QueuedTask, TaskInfo, TaskManager, TaskPriority
7+
from fluid.scheduler.errors import UnknownTaskError
8+
9+
router = APIRouter()
10+
11+
12+
def get_task_manger(request: Request) -> TaskManager:
13+
return request.app.state.task_manager
14+
15+
16+
TaskManagerDep = Annotated[TaskManager, Depends(get_task_manger)]
17+
18+
19+
class TaskUpdate(BaseModel):
20+
enabled: bool = Field(description="Task enabled or disabled")
21+
22+
23+
class TaskCreate(BaseModel):
24+
name: str = Field(description="Task name")
25+
params: dict = Field(description="Task parameters", default_factory=dict)
26+
priority: TaskPriority | None = Field(default=None, description="Task priority")
27+
28+
29+
@router.get(
30+
"/tasks",
31+
response_model=list[TaskInfo],
32+
summary="List Tasks",
33+
description="Retrieve a list of tasks runs",
34+
)
35+
async def get_tasks(task_manager: TaskManagerDep) -> list[TaskInfo]:
36+
return await task_manager.broker.get_tasks_info()
37+
38+
39+
@router.post(
40+
"/tasks",
41+
response_model=QueuedTask,
42+
summary="Queue a new Tasks",
43+
description="Queue a new task to be run",
44+
)
45+
async def queue_task(
46+
task_manager: TaskManagerDep,
47+
task: TaskCreate,
48+
) -> QueuedTask:
49+
try:
50+
return task_manager.queue(task.name, task.priority, **task.params)
51+
except UnknownTaskError as exc:
52+
raise HTTPException(status_code=404, detail="Task not found") from exc
53+
54+
55+
@router.patch(
56+
"/tasks/{task_name}",
57+
response_model=TaskInfo,
58+
summary="Update a task",
59+
description="Update a task configuration and enable/disable it",
60+
)
61+
async def patch_task(
62+
task_manager: TaskManagerDep,
63+
task_update: TaskUpdate,
64+
task_name: str = Path(title="Task name"),
65+
) -> TaskInfo:
66+
try:
67+
return await task_manager.broker.enable_task(task_name, task_update.enabled)
68+
except UnknownTaskError as exc:
69+
raise HTTPException(status_code=404, detail="Task not found") from exc

fluid/tasks_api/service.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import os
2+
import signal
3+
from typing import Any, Self
4+
5+
from fastapi import FastAPI
6+
7+
from fluid import settings
8+
from fluid.utils.worker import Worker, Workers
9+
10+
logger = settings.get_logger(__name__)
11+
12+
13+
class FastapiAppWorkers(Workers):
14+
"""An aiohttp runner"""
15+
16+
@classmethod
17+
def setup(cls, app: FastAPI, **kwargs: Any) -> Self:
18+
"""Setup the app runner"""
19+
workers = cls(**kwargs)
20+
app.state.workers = workers
21+
app.add_event_handler("startup", workers.startup)
22+
app.add_event_handler("shutdown", workers.shutdown)
23+
return workers
24+
25+
def bail_out(self, reason: str, code: int = 1) -> None:
26+
logger.warning("shutting down due to %s", reason)
27+
os.kill(os.getpid(), signal.SIGTERM)
28+
29+
def get_active_worker(self, *, worker_name: str) -> Worker | None:
30+
worker = self._workers.get_worker_by_name(worker_name)
31+
if worker and not worker.stopping:
32+
return worker
33+
return None

poetry.lock

+31-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ async-timeout = "^4.0.3"
1818
inflection = "^0.5.1"
1919
redis = "^5.0.1"
2020
pydantic = "^2.0.3"
21+
yarl = "^1.9.4"
2122
aioconsole = {version = "^0.7.0", optional = true }
2223
sqlalchemy = {version = "^2.0.23", optional = true }
2324
fastapi = {version = "^0.110.0", optional = true}
24-
yarl = "^1.9.4"
25+
uvicorn = {version = "^0.29.0", optional = true}
2526

2627
[tool.poetry.group.dev.dependencies]
2728
pytest = "^8.1.1"
@@ -41,7 +42,7 @@ optional = true
4142
mkdocs-material = "^9.5.15"
4243

4344
[tool.poetry.extras]
44-
fastapi = ["fastapi", "aioconsole"]
45+
fastapi = ["fastapi", "aioconsole", "uvicorn"]
4546
db = ["sqlalchemy"]
4647

4748
[build-system]

0 commit comments

Comments
 (0)