Skip to content

Commit

Permalink
fastapi always installed
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Jun 16, 2024
1 parent 0fa96c7 commit c57e6f6
Show file tree
Hide file tree
Showing 36 changed files with 2,034 additions and 1,047 deletions.
2 changes: 1 addition & 1 deletion .dev/lint
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ black fluid tests ${BLACK_ARG}
echo "run ruff"
ruff check fluid tests ${RUFF_ARG}
echo "run mypy"
mypy fluid tests
mypy fluid/scheduler
14 changes: 14 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,19 @@
"RedirectOutput"
]
},
{
"name": "API Serve",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/examples/main.py",
"cwd": "${workspaceFolder}",
"justMyCode": false,
"args": [
"ls"
],
"debugOptions": [
"RedirectOutput"
]
},
]
}
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
BSD 3-Clause License

Copyright (c) 2023, Quantmind
Copyright (c) 2024, Quantmind
All rights reserved.

Redistribution and use in source and binary forms, with or without
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ outdated: ## Show outdated packages

.PHONY: example
example: ## run task scheduler example
@poetry run python -m examples.all_features
@poetry run python -m examples.main
17 changes: 0 additions & 17 deletions examples/all_features.py

This file was deleted.

12 changes: 12 additions & 0 deletions examples/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from examples import tasks
from fluid.scheduler import TaskScheduler
from fluid.utils import log

task_manager = TaskScheduler()
task_manager.register_from_module(tasks)
task_manager_cli = task_manager.cli()


if __name__ == "__main__":
log.config()
task_manager_cli()
15 changes: 9 additions & 6 deletions examples/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import os
import time
from datetime import timedelta
from typing import cast

from fluid.scheduler import TaskRun, every, task
from fluid.scheduler.broker import RedisBroker


@task
Expand All @@ -29,9 +31,10 @@ async def disabled(context: TaskRun) -> float:
return sleep


@task(cpu_bound=True)
async def cpu_bound(context: TaskRun) -> int:
await asyncio.sleep(0.1)
redis = context.task_manager.broker.redis_cli
await redis.setex(context.run_id, os.getpid(), 10)
return 0
@task(cpu_bound=True, schedule=every(timedelta(seconds=5)))
async def cpu_bound(context: TaskRun) -> None:
"""A CPU bound task running on subprocess"""
time.sleep(1)
broker = cast(RedisBroker, context.task_manager.broker)
redis = broker.redis_cli
await redis.setex(context.id, os.getpid(), 10)
4 changes: 2 additions & 2 deletions fluid/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from .broker import Broker, QueuedTask
from .broker import Broker
from .consumer import TaskConsumer, TaskManager
from .crontab import Scheduler, crontab
from .every import every
from .models import Task, TaskInfo, TaskPriority, TaskRun, TaskState, task
from .models import Task, TaskInfo, TaskPriority, TaskRun, TaskState, task, QueuedTask
from .scheduler import TaskScheduler

__all__ = [
Expand Down
37 changes: 12 additions & 25 deletions fluid/scheduler/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
from yarl import URL

from fluid import settings
from fluid.utils.redis import Redis, FluidRedis
from redis.asyncio import Redis
from fluid.utils.redis import FluidRedis
import json
from .errors import UnknownTaskError

from .models import QueuedTask, Task, TaskInfo, TaskPriority, TaskRun
from .models import Task, TaskInfo, TaskPriority, TaskRun, TaskInfoUpdate

if TYPE_CHECKING: # pragma: no cover
from .consumer import TaskManager
Expand Down Expand Up @@ -42,9 +43,7 @@ def task_queue_names(self) -> tuple[str, ...]:
"""Names of the task queues"""

@abstractmethod
async def queue_task(
self, task_manager: TaskManager, queued_task: QueuedTask
) -> TaskRun:
async def queue_task(self, task_run: TaskRun) -> None:
"""Queue a task"""

@abstractmethod
Expand Down Expand Up @@ -129,7 +128,7 @@ def redis(self) -> FluidRedis:
return FluidRedis.create(str(self.url.with_query({})), name=self.name)

@property
def redis_cli(self) -> Redis:
def redis_cli(self) -> Redis[bytes]:
return self.redis.redis_cli

@property
Expand Down Expand Up @@ -170,9 +169,10 @@ async def get_tasks_info(self, *task_names: str) -> list[TaskInfo]:

async def update_task(self, task: Task, params: dict[str, Any]) -> TaskInfo:
pipe = self.redis_cli.pipeline()
info = json.loads(TaskInfoUpdate(**params).model_dump_json())
pipe.hset(
self.task_hash_name(task.name),
mapping={name: json.dumps(value) for name, value in params.items()},
mapping={name: json.dumps(value) for name, value in info.items()},
)
pipe.hgetall(self.task_hash_name(task.name))
_, info = await pipe.execute()
Expand All @@ -193,8 +193,8 @@ async def close(self) -> None:

async def get_task_run(self, task_manager: TaskManager) -> TaskRun | None:
if self.task_queue_names:
if redis_data := await self.redis_cli.brpop( # type: ignore [misc]
self.task_queue_names, # type: ignore [arg-type]
if redis_data := await self.redis_cli.brpop(
self.task_queue_names,
timeout=1,
):
data = json.loads(redis_data[1])
Expand All @@ -205,31 +205,18 @@ async def get_task_run(self, task_manager: TaskManager) -> TaskRun | None:
return TaskRun(**data)
return None

async def queue_task(
self, task_manager: TaskManager, queued_task: QueuedTask
) -> TaskRun:
task_run = self.create_task_run(task_manager, queued_task)
await self.redis_cli.lpush( # type: ignore [misc]
async def queue_task(self, task_run: TaskRun) -> None:
await self.redis_cli.lpush(
self.task_queue_name(task_run.priority),
task_run.model_dump_json(),
)
return task_run

def lock(self, name: str, timeout: float | None = None) -> Lock:
return self.redis_cli.lock(name, timeout=timeout)

def _decode_task(self, task: Task, data: dict[bytes, Any]) -> TaskInfo:
info = {name.decode(): json.loads(value) for name, value in data.items()}
return TaskInfo(
name=task.name,
description=task.description,
schedule=str(task.schedule) if task.schedule else None,
priority=task.priority,
enabled=info.get("enabled", True),
last_run_duration=info.get("last_run_duration"),
last_run_end=info.get("last_run_end"),
last_run_state=info.get("last_run_state"),
)
return task.info(**info)


Broker.register_broker("redis", RedisBroker)
Expand Down
117 changes: 117 additions & 0 deletions fluid/scheduler/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import click
from rich.console import Console
from rich.table import Table

from fluid.tools_fastapi import serve_app
from .endpoints import setup_fastapi

if TYPE_CHECKING:
from .consumer import TaskManager
from .models import TaskRun


DEFAULT_COMMANDS: list[click.Command] = []


class TaskManagerCLI(click.Group):
def __init__(self, task_manager: TaskManager, **kwargs: Any):
kwargs.setdefault("commands", DEFAULT_COMMANDS)
super().__init__(**kwargs)
self.task_manager = task_manager


def ctx_task_manager(ctx: click.Context) -> TaskManager:
return ctx.parent.command.task_manager # type: ignore


@click.command()
@click.pass_context
def ls(ctx: click.Context) -> None:
"""list all tasks"""
task_manager = ctx_task_manager(ctx)
table = Table(title="Tasks")
table.add_column("Name", style="cyan", no_wrap=True)
table.add_column("Schedule", style="magenta")
table.add_column("CPU bound", style="magenta")
table.add_column("Description", style="green")
for name in sorted(task_manager.registry):
task = task_manager.registry[name]
table.add_row(
name,
str(task.schedule),
"yes" if task.cpu_bound else "no",
task.description,
)
console = Console()
console.print(table)


@click.command()
@click.pass_context
@click.argument("task")
@click.option(
"--dry-run",
is_flag=True,
help="dry run (if the tasks supports it)",
default=False,
)
def execute(ctx: click.Context, task: str, dry_run: bool) -> None:
"""execute a task"""
task_manager = ctx_task_manager(ctx)
run = task_manager.execute_sync(task, dry_run=dry_run)
console = Console()
console.print(task_run_table(run))


@click.command("serve", short_help="Start app server.")
@click.option(
"--host",
"-h",
default="0.0.0.0",
help="The interface to bind to",
show_default=True,
)
@click.option(
"--port",
"-p",
default=8080,
help="The port to bind to",
show_default=True,
)
@click.option(
"--reload",
is_flag=True,
default=False,
help="Enable auto-reload",
show_default=True,
)
@click.pass_context
def serve(ctx: click.Context, host: str, port: int, reload: bool) -> None:
"""Run the service."""
task_manager = ctx_task_manager(ctx)
app = setup_fastapi(task_manager)
serve_app(app, host, port, reload)


DEFAULT_COMMANDS = (ls, execute, serve)


def task_run_table(task_run: TaskRun) -> Table:
table = Table(title="Task Run", show_header=False)
color = "red" if task_run.state.is_failure else "green"
table.add_column("Name", style="cyan")
table.add_column("Description", style=color)
table.add_row("name", task_run.task.name)
table.add_row("description", task_run.task.description)
table.add_row("run_id", task_run.id)
table.add_row("state", task_run.state)
if task_run.start:
table.add_row("started", task_run.start.isoformat())
if task_run.end:
table.add_row("completed", task_run.end.isoformat())
table.add_row("duration ms", str(task_run.duration_ms))
return table
Loading

0 comments on commit c57e6f6

Please sign in to comment.