Skip to content

Commit

Permalink
Reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Mar 29, 2024
1 parent efcfbff commit d811720
Show file tree
Hide file tree
Showing 18 changed files with 907 additions and 1,599 deletions.
2 changes: 1 addition & 1 deletion dev/install → .dev/install
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env bash
pip install -U pip poetry
poetry install
poetry install --all-extras
4 changes: 2 additions & 2 deletions dev/lint → .dev/lint
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ fi
echo "run black"
black fluid tests ${BLACK_ARG}
echo "run ruff"
ruff fluid tests ${RUFF_ARG}
ruff check fluid tests ${RUFF_ARG}
echo "run mypy"
mypy fluid/scheduler tests/scheduler
mypy fluid tests
File renamed without changes.
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ jobs:
PYPI_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
strategy:
matrix:
python-version: ["3.10", "3.11"]
python-version: ["3.11", "3.12"]

steps:
- name: checkout code
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Start Redis
Expand Down
10 changes: 3 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@ clean: ## remove python cache files
rm -rf .coverage


install: ## install packages in virtualenv
@./dev/install
install: ## install dev packages via poetry
@./.dev/install


lint: ## run linters
poetry run ./dev/lint fix


mypy: ## run mypy
@poetry run mypy fluid
poetry run ./.dev/lint fix


test: ## test with coverage
Expand Down
1 change: 1 addition & 0 deletions fluid/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Reusable server side python modules"""

__version__ = "0.7.0"
6 changes: 2 additions & 4 deletions fluid/scheduler/cpubound.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ def __call__(self, executor: TaskExecutor) -> Task:


@overload
def cpu_task(executor: TaskExecutor) -> Task:
...
def cpu_task(executor: TaskExecutor) -> Task: ...


@overload
Expand All @@ -77,8 +76,7 @@ def cpu_task(
randomize: RandomizeType | None = None,
max_concurrency: int = 1,
priority: TaskPriority = TaskPriority.medium,
) -> CpuTaskConstructor:
...
) -> CpuTaskConstructor: ...


def cpu_task(
Expand Down
1 change: 1 addition & 0 deletions fluid/scheduler/crontab.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Originally from https://github.com/coleifer/huey
"""

import re
from abc import ABC, abstractmethod
from datetime import datetime
Expand Down
35 changes: 3 additions & 32 deletions fluid/scheduler/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,19 @@
import inspect
import logging
import os
from importlib import import_module
from typing import (
TYPE_CHECKING,
Any,
Callable,
Coroutine,
NamedTuple,
TypeVar,
cast,
overload,
)
from uuid import uuid4

from openapi.spec.utils import trim_docstring

from fluid import log
from fluid.tools_aiohttp.node import WorkerApplication
from fluid.utils import microseconds
from fluid.tools.text import trim_docstring

from .constants import TaskPriority
from .crontab import Scheduler
Expand Down Expand Up @@ -49,18 +44,6 @@ class ImproperlyConfigured(RuntimeError):
pass


def create_task_app() -> WorkerApplication:
if not TASK_MANAGER_APP:
raise ImproperlyConfigured("missing TASK_MANAGER_APP environment variable")
bits = TASK_MANAGER_APP.split(":")
if len(bits) != 2:
raise ImproperlyConfigured(
"TASK_MANAGER_APP must be of the form <module>:<function>"
)
mod = import_module(bits[0])
return cast(WorkerApplication, getattr(mod, bits[1])())


class TaskContext(NamedTuple):
task_manager: "TaskManager"
task_run: TaskRun
Expand Down Expand Up @@ -111,16 +94,6 @@ class Task(NamedTuple):
async def register(self, broker: "Broker") -> None:
pass

async def __call__(
self,
task_manager: TaskManager | None = None,
**kwargs: Any,
) -> Any:
if task_manager is None:
task_manager = create_task_app()["task_manager"]
context = self.create_context(task_manager, **kwargs)
return await self.executor(context)

def create_context(
self,
task_manager: TaskManager,
Expand All @@ -144,8 +117,7 @@ def create_context(


@overload
def task(executor: TaskExecutor) -> Task:
...
def task(executor: TaskExecutor) -> Task: ...


@overload
Expand All @@ -157,8 +129,7 @@ def task(
randomize: RandomizeType | None = None,
max_concurrency: int = 1,
priority: TaskPriority = TaskPriority.medium,
) -> TaskConstructor:
...
) -> TaskConstructor: ...


# implementation of the task decorator
Expand Down
14 changes: 12 additions & 2 deletions fluid/settings.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import os

from .tools.text import str2bool
from .tools.text import to_bool

# Workers
STOPPING_GRACE_PERIOD: int = int(os.getenv("STOPPING_GRACE_PERIOD") or "10")

# Redis

# Database
DBPOOL_MAX_SIZE: int = int(os.getenv("DBPOOL_MAX_SIZE") or "10")
DBPOOL_MAX_OVERFLOW: int = int(os.getenv("DBPOOL_MAX_OVERFLOW") or "10")
DBECHO: bool = str2bool(os.getenv("DBECHO") or "no")
DBECHO: bool = to_bool(os.getenv("DBECHO") or "no")


# Flamegraph
STACK_SAMPLER_PERIOD: int = int(os.getenv("STACK_SAMPLER_PERIOD", "60"))
FLAMEGRAPH_EXECUTABLE: str = os.getenv("FLAMEGRAPH_EXECUTABLE") or "/bin/flamegraph.pl"
FLAMEGRAPH_DATA_BUCKET, FLAMEGRAPH_DATA_PATH = os.getenv(
"FLAMEGRAPH_DATA_BUCKET_PATH", "replace/me"
).split("/")
28 changes: 19 additions & 9 deletions fluid/tools/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import asyncio
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Any, Awaitable, Callable, Dict, Generic, TypeVar
from typing import Awaitable, Callable, Dict, Generic, TypeVar

MessageType = TypeVar("MessageType")
MessageHandlerType = TypeVar("MessageHandlerType")


class BaseDispatcher(Generic[MessageType, MessageHandlerType], ABC):
def __init__(self) -> None:
self._msg_handlers: Dict[str, Dict[str, MessageHandlerType]] = defaultdict(
self._msg_handlers: Dict[str, dict[str, MessageHandlerType]] = defaultdict(
dict,
)

Expand All @@ -18,11 +18,20 @@ def register_handler(
message_type: str,
handler: MessageHandlerType,
tag: str = "",
) -> None:
) -> MessageHandlerType | None:
previous = self._msg_handlers[message_type].get(tag)
self._msg_handlers[message_type][tag] = handler
return previous

def unregister_handler(self, message_type: str, tag: str = "") -> None:
self._msg_handlers[message_type].pop(tag)
def unregister_handler(
self, message_type: str, tag: str = ""
) -> MessageHandlerType | None:
return self._msg_handlers[message_type].pop(tag, None)

def on(
self, handler: MessageHandlerType, tag: str = ""
) -> MessageHandlerType | None:
return self.register_handler("*", handler, tag)

def get_handlers(
self,
Expand All @@ -45,10 +54,6 @@ def dispatch(self, message: MessageType) -> int:
handler(message)
return len(handlers or ())

@abstractmethod
def create_message(self, **kwargs: Any) -> MessageType:
"""create a message"""


class AsyncDispatcher(
BaseDispatcher[MessageType, Callable[[MessageType], Awaitable[None]]],
Expand All @@ -59,3 +64,8 @@ async def dispatch(self, message: MessageType) -> int:
if handlers:
await asyncio.gather(*[handler(message) for handler in handlers.values()])
return len(handlers or ())


class SimpleDispatcher(Dispatcher[MessageType]):
def message_type(self, message: MessageType) -> str:
return "*"
Loading

0 comments on commit d811720

Please sign in to comment.