Skip to content

Commit

Permalink
Merge pull request #195 from reef-technologies/dead-letter-queue
Browse files Browse the repository at this point in the history
Add dead letter queue support
  • Loading branch information
agoncharov-reef authored Oct 3, 2024
2 parents daf2b91 + e9b79fb commit e5c44ba
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 7 deletions.
24 changes: 24 additions & 0 deletions {{cookiecutter.repostory_name}}/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ If an SSO provider supports the OIDC protocol, it can be set up as a generic OID
2. Allauth "social users" are just an extension to regular django users. When someone logs in via allauth, a django user model will also be created for them.
3. A "profile" page is available at `/accounts/`

{% endif %}

{% if cookiecutter.use_celery == 'y' %}
# Background tasks with Celery

## Dead letter queue

<details>
There is a special queue named `dead_letter` that is used to store tasks
that failed for some reason.

A task should be annotated with `on_failure=send_to_dead_letter_queue`.
Once the reason of tasks failure is fixed, the task can be re-processed
by moving tasks from dead letter queue to the main one ("celery"):

manage.py move_tasks "dead_letter" "celery"

If tasks fails again, it will be put back to dead letter queue.

To flush add tasks in specific queue, use

manage.py flush_tasks "dead_letter"
</details>

{% endif %}
{% if cookiecutter.monitoring == 'y' %}
# Monitoring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{% endif -%}
from django.conf import settings
from django_structlog.celery.steps import DjangoStructLogInitStep
from more_itertools import chunked
{%- if cookiecutter.monitoring == "y" %}
from prometheus_client import multiprocess
{% endif %}
Expand All @@ -16,7 +17,6 @@
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "{{cookiecutter.django_project_name}}.settings")

app = Celery("{{cookiecutter.django_project_name}}")

app.config_from_object("django.conf:settings", namespace="CELERY")
app.steps["worker"].add(DjangoStructLogInitStep)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Expand All @@ -33,10 +33,34 @@ def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs): # pr
configure_structlog()


def route_task(name, args, kwargs, options, task=None, **kw):
return {"queue": "celery"}
{% if cookiecutter.monitoring == "y" %}
def get_tasks_in_queue(queue_name: str) -> list[bytes]:
with app.pool.acquire(block=True) as conn:
return conn.default_channel.client.lrange(queue_name, 0, -1)


def get_num_tasks_in_queue(queue_name: str) -> int:
with app.pool.acquire(block=True) as conn:
return conn.default_channel.client.llen(queue_name)


def move_tasks(source_queue: str, destination_queue: str, chunk_size: int = 100) -> None:
with app.pool.acquire(block=True) as conn:
client = conn.default_channel.client
tasks = client.lrange(source_queue, 0, -1)

for chunk in chunked(tasks, chunk_size):
with client.pipeline() as pipe:
for task in chunk:
client.rpush(destination_queue, task)
client.lrem(source_queue, 1, task)
pipe.execute()


def flush_tasks(queue_name: str) -> None:
with app.pool.acquire(block=True) as conn:
conn.default_channel.client.delete(queue_name)

{% if cookiecutter.monitoring == "y" %}
@worker_process_shutdown.connect
def child_exit(pid, **kw):
multiprocess.mark_process_dead(pid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

{% if cookiecutter.use_celery == "y" -%}
# from celery.schedules import crontab
from kombu import Queue

{% endif %}
{%- if cookiecutter.use_allauth == "y" -%}
from django.urls import reverse_lazy
Expand Down Expand Up @@ -337,7 +339,12 @@ def wrapped(*args, **kwargs):
# 'options': {"time_limit": 300},
# },
}
CELERY_TASK_ROUTES = ["{{cookiecutter.django_project_name}}.celery.route_task"]
CELERY_TASK_CREATE_MISSING_QUEUES = False
CELERY_TASK_QUEUES = (Queue("celery"), Queue("worker"), Queue("dead_letter"))
CELERY_TASK_DEFAULT_EXCHANGE = "celery"
CELERY_TASK_DEFAULT_ROUTING_KEY = "celery"
CELERY_TASK_ANNOTATIONS = {"*": {"acks_late": True, "reject_on_worker_lost": True}}
CELERY_TASK_ROUTES = {"*": {"queue": "celery"}}
CELERY_TASK_TIME_LIMIT = int(timedelta(minutes=5).total_seconds())
CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", default=False)
CELERY_ACCEPT_CONTENT = ["json"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.core.management.base import BaseCommand

from {{cookiecutter.django_project_name}}.celery import flush_tasks, get_num_tasks_in_queue


class Command(BaseCommand):
help = "Flush task queue."

def add_arguments(self, parser) -> None:
parser.add_argument("queue", type=str, help="Queue name to flush")

def handle(self, *args, **kwargs):
queue_name = kwargs["queue"]

num_tasks = get_num_tasks_in_queue(queue_name)
self.stdout.write(f"Found {num_tasks} tasks in '{queue_name}' queue")
if not num_tasks:
return

flush_tasks(queue_name)
self.stdout.write("All done")
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from django.core.management.base import BaseCommand

from {{cookiecutter.django_project_name}}.celery import get_num_tasks_in_queue, move_tasks


class Command(BaseCommand):
help = "Reschedule dead letter tasks."

def add_arguments(self, parser) -> None:
parser.add_argument("source_queue", type=str, help="Source queue name")
parser.add_argument("destination_queue", type=str, help="Destination queue name")

def handle(self, *args, **kwargs):
source_queue = kwargs["source_queue"]
destination_queue = kwargs["destination_queue"]

num_tasks = get_num_tasks_in_queue(source_queue)
self.stdout.write(f"Found {num_tasks} tasks in '{source_queue}' queue")
if not num_tasks:
return

move_tasks(source_queue, destination_queue)
self.stdout.write("All done")
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
{%- if cookiecutter.monitoring == "y" -%}
import glob
import os
from functools import partial

import prometheus_client
from django.conf import settings
from django.http import HttpResponse
from django_prometheus.exports import ExportToDjangoView
from django_prometheus.migrations import ExportMigrations
from prometheus_client import multiprocess

from ..celery import get_num_tasks_in_queue


class RecursiveMultiProcessCollector(multiprocess.MultiProcessCollector):
"""A multiprocess collector that scans the directory recursively"""
Expand All @@ -33,4 +37,14 @@ def metrics_view(request):
)
else:
return ExportToDjangoView(request)


num_tasks_in_queue = {}
for queue in settings.CELERY_TASK_QUEUES:
gauge = prometheus_client.Gauge(
f"celery_{queue.name}_queue_len",
f"How many tasks are there in '{queue.name}' queue",
)
num_tasks_in_queue[queue.name] = gauge
gauge.set_function(partial(get_num_tasks_in_queue, queue.name))
{% endif %}
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
{%- if cookiecutter.use_celery == 'y' -%}
{%- if cookiecutter.use_celery == "y" -%}
import structlog
from celery import Task
from celery.utils.log import get_task_logger

from {{cookiecutter.django_project_name}}.celery import app

logger = structlog.wrap_logger(get_task_logger(__name__))


@app.task
def send_to_dead_letter_queue(task: Task, exc, task_id, args, kwargs, einfo):
"""Hook to put a task into dead letter queue when it fails."""
if task.app.conf.task_always_eager:
return # do not run failed task again in eager mode

logger.warning(
"Sending failed task to dead letter queue",
task=task,
exc=exc,
task_id=task_id,
args=args,
kwargs=kwargs,
einfo=einfo,
)
task.apply_async(args=args, kwargs=kwargs, queue="dead_letter")


@app.task(on_failure=send_to_dead_letter_queue)
def demo_task(x, y):
logger.info("adding two numbers", x=x, y=y)
return x + y
Expand Down
1 change: 1 addition & 0 deletions {{cookiecutter.repostory_name}}/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"sentry-sdk==1.3.0",
"ipython~=8.14.0",
"nox==2023.4.22",
"more-itertools~=10.3.0",
{% if cookiecutter.monitoring == "y" -%}
"psutil>=5.9.8",
"prometheus-client~=0.17.0",
Expand Down

0 comments on commit e5c44ba

Please sign in to comment.