Skip to content

Commit

Permalink
subdivide locks code
Browse files Browse the repository at this point in the history
  • Loading branch information
davmlaw committed Aug 9, 2024
1 parent c49beda commit 420c06a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
5 changes: 4 additions & 1 deletion annotation/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,10 @@ class AnnotationRangeLock(models.Model):
def __str__(self):
min_v = self.min_variant_id
max_v = self.max_variant_id
return f"AnnotationRangeLock: (v. {self.version}) {min_v} - {max_v}"
s = f"AnnotationRangeLock: (v. {self.version}) {min_v} - {max_v}"
if self.count is not None:
s += f" (count={self.count})"
return s

@staticmethod
def release_variant(variant: Variant):
Expand Down
40 changes: 39 additions & 1 deletion annotation/tasks/annotation_scheduler_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from django.conf import settings
from django.core.cache import cache

from annotation.annotation_version_querysets import get_variants_qs_for_annotation
from annotation.annotation_versions import get_annotation_range_lock_and_unannotated_count
from annotation.models import AnnotationRun, VariantAnnotationPipelineType
from annotation.models.models import AnnotationVersion, AnnotationRangeLock
from annotation.tasks.annotate_variants import annotate_variants
from library.log_utils import log_traceback
from snpdb.models import GenomeBuild, ImportStatus, Sample, VCF
from snpdb.models import GenomeBuild, ImportStatus, Sample, VCF, Variant


@celery.shared_task(queue='scheduling_single_worker')
Expand Down Expand Up @@ -87,3 +88,40 @@ def _handle_variant_annotation_version(variant_annotation_version):
logging.info("No waiting vcfs or samples (this is good!)")

return range_lock


def subdivide_annotation_range_lock(arl: AnnotationRangeLock, minimum_size=1000):
# Sometimes we have an annotation run crash or timeout etc, this splits it in half so we can try again
# With a smaller run
size = int(arl.max_variant_id) - int(arl.min_variant_id)
if size < minimum_size:
raise ValueError(f"Cannot subdivide {arl} below minimum size of {minimum_size}")

logging.info("Subdividing %s", arl)

# Delete all existing annotation runs
res = arl.annotationrun_set.delete()
logging.info("Deleted attached annotation runs: %s", res)

half_size = size / 1000
halfway_point = int(arl.min_variant_id) + half_size
first_at_or_above_halfway = Variant.objects.filter(pk__gte=halfway_point).order_by("pk").first()
first_below_halfway = Variant.objects.filter(pk__gte=arl.min_variant_id, pk__lt=halfway_point).order_by("pk").last()

av = arl.version.get_any_annotation_version()
unannotated_qs = get_variants_qs_for_annotation(av, min_variant_id=first_at_or_above_halfway.pk,
max_variant_id=arl.max_variant.pk)

top_half_count = unannotated_qs.count()
new_lock = AnnotationRangeLock.objects.create(version=arl.version,
min_variant=first_at_or_above_halfway,
max_variant=arl.max_variant,
count=top_half_count)

arl.max_variant = first_below_halfway
arl.count -= top_half_count
arl.save()

logging.info("Shifted old lock down: %s", arl)
logging.info("New lock created: %s", new_lock)

0 comments on commit 420c06a

Please sign in to comment.