Skip to content

Commit 245df46

Browse files
amurekipre-commit-ci[bot]codingjoe
authored
Narrow DB locks from table to specific rows (#148)
* Drop Prometheus middleware from dramatiq settings Prometheus client became optional in dramatiq v2: https://github.com/Bogdanp/dramatiq/releases/tag/v2.0.0 * Add fail_fast=True to maintain previous behaviour * Use row not table locks * Update Dramatiq to v2+ --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Johannes Maron <johannes@maron.family>
1 parent b4f1880 commit 245df46

File tree

4 files changed

+13
-7
lines changed

4 files changed

+13
-7
lines changed

joeflow/runner/dramatiq.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,17 @@ class RetryError(dramatiq.errors.Retry):
3232
def _dramatiq_task_runner(task_pk, workflow_pk, retries=0):
3333
Task = apps.get_model("joeflow", "Task")
3434
with transaction.atomic():
35-
task = Task.objects.select_for_update().get(pk=task_pk, completed=None)
35+
task = (
36+
Task.objects.filter(pk=task_pk, completed=None)
37+
.select_for_update(nowait=True)
38+
.get()
39+
)
3640

3741
workflow = (
3842
task.content_type.model_class()
39-
.objects.select_for_update(nowait=True)
40-
.get(pk=workflow_pk)
43+
.objects.filter(pk=workflow_pk)
44+
.select_for_update(nowait=True)
45+
.get()
4146
)
4247

4348
try:

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ test = [
6363
docs = [
6464
"celery>=4.2.0",
6565
"django-reversion",
66-
"dramatiq",
66+
"dramatiq>=2.0.0",
6767
"django_dramatiq",
6868
"redis",
6969
"graphviz>=0.18",
@@ -78,7 +78,7 @@ celery = [
7878
"celery>=4.2.0",
7979
]
8080
dramatiq = [
81-
"dramatiq",
81+
"dramatiq>=2.0.0",
8282
"django_dramatiq",
8383
]
8484

tests/conftest.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ def stub_worker(monkeypatch, settings, _runner):
4848
class Meta:
4949
@staticmethod
5050
def wait():
51-
broker.join(settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000)
51+
broker.join(
52+
settings.JOEFLOW_CELERY_QUEUE_NAME, timeout=60000, fail_fast=False
53+
)
5254
worker.join()
5355

5456
yield Meta

tests/testapp/settings.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@
132132
DRAMATIQ_BROKER = {
133133
"BROKER": os.getenv("DRAMATIQ_BROKER", "dramatiq.brokers.redis.RedisBroker"),
134134
"MIDDLEWARE": [
135-
"dramatiq.middleware.Prometheus",
136135
"dramatiq.middleware.AgeLimit",
137136
"dramatiq.middleware.TimeLimit",
138137
"dramatiq.middleware.Callbacks",

0 commit comments

Comments
 (0)