From ff5385d6c464de728ed5ea596191e0a01b126922 Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:55:04 +0100 Subject: [PATCH] Add retries for saving locked objects --- app/grandchallenge/challenges/tasks.py | 59 +++++++++++++++++++++----- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/app/grandchallenge/challenges/tasks.py b/app/grandchallenge/challenges/tasks.py index e84d4b1bc..36b9bd809 100644 --- a/app/grandchallenge/challenges/tasks.py +++ b/app/grandchallenge/challenges/tasks.py @@ -1,6 +1,11 @@ +import functools +import random +import time + from django.contrib.auth import get_user_model from django.db import transaction from django.db.models import Count, Max +from psycopg.errors import LockNotAvailable from grandchallenge.challenges.costs import ( annotate_compute_costs_and_storage_size, @@ -56,25 +61,57 @@ def update_challenge_results_cache(): ) +def retry_with_backoff(exceptions, max_attempts=5, base_delay=1, max_delay=10): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + for attempt in range(1, max_attempts + 1): + try: + return func(*args, **kwargs) + except exceptions: + if attempt == max_attempts: + raise + + delay = min(base_delay * (2 ** (attempt - 1)), max_delay) + jitter = random.uniform(0, delay) + total_delay = delay + jitter + + time.sleep(total_delay) + + return wrapper + + return decorator + + @acks_late_2xlarge_task def update_compute_costs_and_storage_size(): for challenge in Challenge.objects.with_available_compute().iterator(): with transaction.atomic(): annotate_compute_costs_and_storage_size(challenge=challenge) - challenge.save( - update_fields=( - "size_in_storage", - "size_in_registry", - "compute_cost_euro_millicents", + + @retry_with_backoff((LockNotAvailable,)) + def save_challenge(): + challenge.save( + update_fields=( + "size_in_storage", + "size_in_registry", + "compute_cost_euro_millicents", + ) ) - ) + + save_challenge() for phase in Phase.objects.iterator(): with transaction.atomic(): annotate_job_duration_and_compute_costs(phase=phase) - phase.save( - update_fields=( - "average_algorithm_job_duration", - "compute_cost_euro_millicents", + + @retry_with_backoff((LockNotAvailable,)) + def save_phase(): + phase.save( + update_fields=( + "average_algorithm_job_duration", + "compute_cost_euro_millicents", + ) ) - ) + + save_phase()