Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
- fixes py3.9
- add more documentation regarding lifecycle tasks
  • Loading branch information
devkral committed Jan 3, 2025
1 parent e92baf3 commit 6dce424
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 3 deletions.
8 changes: 5 additions & 3 deletions asyncz/stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pickle
import shutil
from contextlib import suppress
from datetime import UTC, datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union, cast

Expand Down Expand Up @@ -47,6 +47,7 @@ def __init__(
self.mode = mode
self.cleanup_directory = cleanup_directory
self.suffix = suffix
self.max_dt = datetime.max.replace(tzinfo=timezone.utc)

def check_task_id(self, task_id: str | None) -> None:
if task_id is None:
Expand All @@ -65,6 +66,8 @@ def start(self, scheduler: Any, alias: str) -> None:
self.directory.mkdir(self.mode, parents=True, exist_ok=True)
if not self.directory.is_dir():
raise RuntimeError("Not a directory.")
if self.scheduler is not None:
self.max_dt = datetime.max.replace(tzinfo=self.scheduler.timezone)

def shutdown(self) -> None:
if self.cleanup_directory:
Expand Down Expand Up @@ -99,7 +102,6 @@ def get_due_tasks(self, now: datetime) -> list[TaskType]:

def get_tasks(self) -> list[TaskType]:
tasks: list[tuple[TaskType, os.stat_result]] = []
max_dt = datetime.max.replace(tzinfo=UTC)
with os.scandir(self.directory) as scanner:
for entry in scanner:
if not entry.name.endswith(self.suffix) or not entry.is_file():
Expand All @@ -116,7 +118,7 @@ def get_tasks(self) -> list[TaskType]:
for task, _ in sorted(
tasks,
key=lambda task_stat: (
task_stat[0].next_run_time or max_dt,
task_stat[0].next_run_time or self.max_dt,
task_stat[1].st_mtime,
),
)
Expand Down
61 changes: 61 additions & 0 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,64 @@ This is also possible with asynchronous generators.

!!! Warning
Lifecycle tasks are only valid for the memory store. Generators cannot be pickled.


### Tasks with lifecycle in multi-processing environments

Natively we cannot add tasks with a lifecycle to other stores than the MemoryStore.
But we can combine both paradigmas with multiple stores or keeping the lifecycle task out of stores.

We have a memory store for the lifecycle and optionally an other store for the ticks in the lifecycle.
For multi-process synchronization we have the `asyncz.locks.FileProtectedLock`.

Here are some options:

#### Option 1: Multiple life-cycle tasks, tick only one

Here the setup of the life-cycle tasks is executed simultanously. In case of a setting up database
connections and having four worker processes, there will be after the setup 4 connections to the database.

When this is no problem, this is the easiest way.

lifecycle:

```python title="Tick only one lifecycle task, with shutdown"
{!> ../docs_src/tasks/lifecycle_mp_tick_only_one.py !}
```

The memory store is just required for the shutdown task and can be left out when having no shutdown tasks or the shutdown tasks use a global referencable function
like `lifecycle_tick`.


```python title="Tick only one lifecycle task, without shutdown"
{!> ../docs_src/tasks/lifecycle_mp_tick_only_one_simple.py !}
```

#### Option 2: Single life-cycle task, setup on demand

When there should be only one task creating for example connections to a database, this
type is the way to go.

Here we split the setup and the cleanup process each in two phases.

Next to the global setup/cleanup exists a file lock protected setup.
Here we start the clients and clean in the lock protected cleanup phase the clients up (e.g. disconnecting).

The clever part of the design is: whenever a process is stopped the next scheduler picks up:

```python title="Only one concurrent lifecycle task"
{!> ../docs_src/tasks/lifecycle_mp_only_one_instance.py !}
```

Can we simplify this? Yes. By sacrificing execution accuracy of the background job we can just remove the store lock from the scheduler
and remove the file store.
When a worker process is stopped, it is here possible that one cycle is skipped. But when this is no problem,
this is the way to go.

```python title="Only one concurrent lifecycle task with lower accuracy"
{!> ../docs_src/tasks/lifecycle_mp_only_one_instance_simple.py !}
```

#### Conclusions

That are only some options. In real-life setups it is even possible to mix the non-simplified options.
73 changes: 73 additions & 0 deletions docs_src/tasks/lifecycle_mp_only_one_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected

# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
"memory": {"type": "memory"},
},
)


def lifecycle_task(name: str):
# setup initial
...
# intialize a file lock (multi-processing safe)
file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
while True:
# don't block the generator
with file_lock.protected(False) as got_the_lock:
if not got_the_lock:
running = yield
if not running:
break
continue
# delayed setup phase. Only executed when the lock was grabbed. e.g. for creating db clients.
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(
make_function(generator.send), args=[False], trigger="shutdown", store="memory"
)
running = yield
while running:
# do something safe
try:
# do something risky
...
except Exception:
# log
...
running = yield
try:
# cleanup the loop setup
...
except Exception:
# log
...
# break the loop
break
# extra cleanup which is always executed except an exception was raised


# setup task
generator = lifecycle_task("foo")
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
...
# Now the shutdown task is executed and the generator progresses in the cleanup
60 changes: 60 additions & 0 deletions docs_src/tasks/lifecycle_mp_only_one_instance_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected

# Create a scheduler
scheduler = AsyncIOScheduler()


def lifecycle_task(name: str):
# setup initial
...
# intialize a file lock (multi-processing safe)
file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
while True:
# don't block the generator
with file_lock.protected(False) as got_the_lock:
if not got_the_lock:
running = yield
if not running:
break
continue
# delayed setup phase. Only executed when the lock was grabbed. e.g. for creating db clients.
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
running = yield
while running:
# do something safe
try:
# do something risky
...
except Exception:
# log
...
running = yield
try:
# cleanup the loop setup
...
except Exception:
# log
...
# break the loop
break
# extra cleanup which is always executed except an exception was raised


# setup task
generator = lifecycle_task("foo")
generator.send(None)


# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)

# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
...
# Now the shutdown task is executed and the generator progresses in the cleanup
46 changes: 46 additions & 0 deletions docs_src/tasks/lifecycle_mp_tick_only_one.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
"memory": {"type": "memory"},
},
)


def lifecycle_task():
# setup
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(
make_function(generator.send), args=[False], trigger="shutdown", store="memory"
)
running = yield
while running:
# do something
running = yield
# cleanup


# setup task
generator = lifecycle_task()
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()
41 changes: 41 additions & 0 deletions docs_src/tasks/lifecycle_mp_tick_only_one_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
},
)


def lifecycle_task():
# setup
...
running = yield
while running:
# do something
running = yield
# cleanup


# setup task
generator = lifecycle_task()
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()

0 comments on commit 6dce424

Please sign in to comment.