Skip to content

Commit

Permalink
Add retries for saving locked objects
Browse files Browse the repository at this point in the history
  • Loading branch information
jmsmkn committed Jan 9, 2025
1 parent 7328639 commit ff5385d
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions app/grandchallenge/challenges/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import functools
import random
import time

from django.contrib.auth import get_user_model
from django.db import transaction
from django.db.models import Count, Max
from psycopg.errors import LockNotAvailable

from grandchallenge.challenges.costs import (
annotate_compute_costs_and_storage_size,
Expand Down Expand Up @@ -56,25 +61,57 @@ def update_challenge_results_cache():
)


def retry_with_backoff(exceptions, max_attempts=5, base_delay=1, max_delay=10):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions:
if attempt == max_attempts:
raise

delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
jitter = random.uniform(0, delay)
total_delay = delay + jitter

time.sleep(total_delay)

return wrapper

return decorator


@acks_late_2xlarge_task
def update_compute_costs_and_storage_size():
for challenge in Challenge.objects.with_available_compute().iterator():
with transaction.atomic():
annotate_compute_costs_and_storage_size(challenge=challenge)
challenge.save(
update_fields=(
"size_in_storage",
"size_in_registry",
"compute_cost_euro_millicents",

@retry_with_backoff((LockNotAvailable,))
def save_challenge():
challenge.save(
update_fields=(
"size_in_storage",
"size_in_registry",
"compute_cost_euro_millicents",
)
)
)

save_challenge()

for phase in Phase.objects.iterator():
with transaction.atomic():
annotate_job_duration_and_compute_costs(phase=phase)
phase.save(
update_fields=(
"average_algorithm_job_duration",
"compute_cost_euro_millicents",

@retry_with_backoff((LockNotAvailable,))
def save_phase():
phase.save(
update_fields=(
"average_algorithm_job_duration",
"compute_cost_euro_millicents",
)
)
)

save_phase()

0 comments on commit ff5385d

Please sign in to comment.