From 420c06aefadb534360d6168447464821b05220e1 Mon Sep 17 00:00:00 2001 From: Dave Lawrence Date: Fri, 9 Aug 2024 10:44:31 +0930 Subject: [PATCH] subdivide locks code --- annotation/models/models.py | 5 ++- annotation/tasks/annotation_scheduler_task.py | 40 ++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/annotation/models/models.py b/annotation/models/models.py index c044eb291..172e60f2e 100644 --- a/annotation/models/models.py +++ b/annotation/models/models.py @@ -871,7 +871,10 @@ class AnnotationRangeLock(models.Model): def __str__(self): min_v = self.min_variant_id max_v = self.max_variant_id - return f"AnnotationRangeLock: (v. {self.version}) {min_v} - {max_v}" + s = f"AnnotationRangeLock: (v. {self.version}) {min_v} - {max_v}" + if self.count is not None: + s += f" (count={self.count})" + return s @staticmethod def release_variant(variant: Variant): diff --git a/annotation/tasks/annotation_scheduler_task.py b/annotation/tasks/annotation_scheduler_task.py index 247e573b0..855e6ea9d 100644 --- a/annotation/tasks/annotation_scheduler_task.py +++ b/annotation/tasks/annotation_scheduler_task.py @@ -4,12 +4,13 @@ from django.conf import settings from django.core.cache import cache +from annotation.annotation_version_querysets import get_variants_qs_for_annotation from annotation.annotation_versions import get_annotation_range_lock_and_unannotated_count from annotation.models import AnnotationRun, VariantAnnotationPipelineType from annotation.models.models import AnnotationVersion, AnnotationRangeLock from annotation.tasks.annotate_variants import annotate_variants from library.log_utils import log_traceback -from snpdb.models import GenomeBuild, ImportStatus, Sample, VCF +from snpdb.models import GenomeBuild, ImportStatus, Sample, VCF, Variant @celery.shared_task(queue='scheduling_single_worker') @@ -87,3 +88,40 @@ def _handle_variant_annotation_version(variant_annotation_version): logging.info("No waiting vcfs or samples (this is good!)") return range_lock + + +def subdivide_annotation_range_lock(arl: AnnotationRangeLock, minimum_size=1000): + # Sometimes we have an annotation run crash or timeout etc, this splits it in half so we can try again + # With a smaller run + size = int(arl.max_variant_id) - int(arl.min_variant_id) + if size < minimum_size: + raise ValueError(f"Cannot subdivide {arl} below minimum size of {minimum_size}") + + logging.info("Subdividing %s", arl) + + # Delete all existing annotation runs + res = arl.annotationrun_set.delete() + logging.info("Deleted attached annotation runs: %s", res) + + half_size = size / 1000 + halfway_point = int(arl.min_variant_id) + half_size + first_at_or_above_halfway = Variant.objects.filter(pk__gte=halfway_point).order_by("pk").first() + first_below_halfway = Variant.objects.filter(pk__gte=arl.min_variant_id, pk__lt=halfway_point).order_by("pk").last() + + av = arl.version.get_any_annotation_version() + unannotated_qs = get_variants_qs_for_annotation(av, min_variant_id=first_at_or_above_halfway.pk, + max_variant_id=arl.max_variant.pk) + + top_half_count = unannotated_qs.count() + new_lock = AnnotationRangeLock.objects.create(version=arl.version, + min_variant=first_at_or_above_halfway, + max_variant=arl.max_variant, + count=top_half_count) + + arl.max_variant = first_below_halfway + arl.count -= top_half_count + arl.save() + + logging.info("Shifted old lock down: %s", arl) + logging.info("New lock created: %s", new_lock) +