Skip to content

Commit

Permalink
release
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca committed Apr 24, 2022
1 parent f9fd6f2 commit f88786c
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"env": {},
"args": [
"-x",
"tests/scheduler/test_scheduler.py"
"tests/scheduler/test_every.py"
],
"debugOptions": [
"RedirectOutput"
Expand Down
2 changes: 1 addition & 1 deletion fluid/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Reusable server side python modules"""
__version__ = "0.4.2"
__version__ = "0.4.3"
25 changes: 8 additions & 17 deletions fluid/scheduler/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from .constants import TaskPriority, TaskState
from .task import Task
from .task_run import TaskRun
from .utils import WaitFor

ConsumerCallback = Callable[[TaskRun, "TaskManager"], None]
AsyncExecutor = Callable[..., Coroutine[Any, Any, None]]
Expand Down Expand Up @@ -164,14 +163,9 @@ def num_concurrent_tasks_for(self, task_name: str) -> int:
"""The number of concurrent tasks for a given task_name"""
return len(self._concurrent_tasks[task_name])

async def queue_and_wait(self, task: str, **params: Any) -> TaskRun:
run_id = self.execute(task, **params).id
waitfor = WaitFor(run_id=run_id)
self.register_handler(f"end.{waitfor.run_id}", waitfor)
try:
return await waitfor.waiter
finally:
self.unregister_handler(f"end.{waitfor.run_id}")
async def queue_and_wait(self, task: str, **params: Any) -> Any:
"""Execute a task by-passing the broker task queue and wait for result"""
return await self.execute(task, **params).waiter

def execute(self, task: str, **params: Any) -> TaskRun:
"""Execute a Task by-passing the broker task queue"""
Expand Down Expand Up @@ -210,20 +204,17 @@ async def _consume_tasks(self) -> None:
task_run.start = microseconds()
task_run.set_state(TaskState.running)
task_context = task_run.task.create_context(self, task_run=task_run)
info = await self.broker.get_tasks_info(task_name)
if not info[0].enabled:
task_run.set_state(TaskState.aborted)
task_run.waiter.set_result(None)
self._concurrent_tasks[task_name][task_run.id] = task_run
#
elif task_run.task.max_concurrency <= self.num_concurrent_tasks_for(
task_name
):
if task_run.task.max_concurrency < self.num_concurrent_tasks_for(task_name):
task_run.set_state(TaskState.rate_limited)
task_run.waiter.set_result(None)
elif not (await self.broker.get_tasks_info(task_name))[0].enabled:
task_run.set_state(TaskState.aborted)
task_run.waiter.set_result(None)
#
else:
task_context.logger.info("start")
self._concurrent_tasks[task_name][task_run.id] = task_run
self.dispatch(task_run, "start")
try:
result = await task_run.task.executor(task_context)
Expand Down
9 changes: 8 additions & 1 deletion fluid/scheduler/every.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@


class every(Scheduler):
def __init__(self, delta: timedelta):
def __init__(self, delta: timedelta, delay: timedelta = timedelta()):
self.delta = delta
self.delay = delay
self._started = None

def info(self) -> str:
return str(self.delta)

def __call__(
self, timestamp: datetime, last_run: Optional[CronRun] = None
) -> Optional[CronRun]:
if not last_run and self.delay:
if not self._started:
self._started = timestamp
if timestamp - self._started < self.delay:
return None
year, month, day, hour, minute, second, _, _, _ = timestamp.timetuple()
run = CronRun(year, month, day, hour, minute, second)
if not last_run or timestamp - last_run.datetime >= self.delta:
Expand Down
16 changes: 0 additions & 16 deletions fluid/webcli.py

This file was deleted.

87 changes: 39 additions & 48 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aio-fluid"
version = "0.4.2"
version = "0.4.3"
description = "Tools for backend python services"
license = "BSD"
authors = ["Luca <luca@quantmind.com>"]
Expand Down Expand Up @@ -34,7 +34,7 @@ classifiers = [

[tool.poetry.dependencies]
python = ">=3.8,<4"
aio-openapi = "^2.8.0"
aio-openapi = "^2.9.0"
aioredis = "^2.0.0"
ujson = "^5.1.0"
inflection = "^0.5.1"
Expand All @@ -45,13 +45,13 @@ python-slugify = {version = "^6.1.0", extras = ["unidecode"]}
python-json-logger = "^2.0.2"
colorlog = "^6.6.0"
aiohttp_cors = "^0.7.0"
aiobotocore = {version = "^2.1.0", extras=["boto3"]}
s3fs = {version = "^2022.2.0"}
aiobotocore = {version = "~2.2.0", extras=["boto3"]}
s3fs = {version = "^2022.3.0"}
aio-kong = "^0.9.0"
uvloop = "^0.16.0"

[tool.poetry.dev-dependencies]
pytest = "^6.2.4"
pytest = "^7.1.2"
isort = "^5.9.3"
black = "^22.1.0"
flake8 = "^4.0.1"
Expand All @@ -60,7 +60,7 @@ flake8-commas = "^2.0.0"
pytest-cov = "^3.0.0"
pytest-aiohttp = "^1.0.4"
codecov = "^2.1.12"
mypy = "^0.931"
mypy = "^0.942"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import os

import pytest
from openapi.testing import TestClient, app_cli

from fluid.redis import FluidRedis
from fluid.webcli import TestClient, app_cli

from .app import AppClient, create_app

Expand Down
11 changes: 11 additions & 0 deletions tests/scheduler/test_every.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import asyncio
from datetime import datetime, timedelta

from fluid.scheduler import every


async def test_delay():
scheduler = every(timedelta(seconds=10), delay=timedelta(seconds=0.3))
assert scheduler(datetime.now()) is None
await asyncio.sleep(0.4)
assert scheduler(datetime.now()) is not None
Loading

0 comments on commit f88786c

Please sign in to comment.