Skip to content

Commit

Permalink
Merge branch 'master' into TP2000-1680-no-comm-code-workbaskets
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjamc authored Jan 27, 2025
2 parents c21757f + 2cfaeb2 commit 27db5b9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 298 deletions.
131 changes: 51 additions & 80 deletions publishing/models/packaged_workbasket.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from notifications.models import EnvelopeRejectedNotification
from notifications.models import NotificationLog
from publishing import models as publishing_models
from publishing.models.decorators import refresh_after
from publishing.models.decorators import save_after
from publishing.models.decorators import skip_notifications_if_disabled
from publishing.models.state import ProcessingState
Expand Down Expand Up @@ -438,7 +437,6 @@ def begin_processing_condition_at_position_1(self) -> bool:
"""Django FSM condition: Instance must be at position 1 in order to
complete the begin_processing transition to CURRENTLY_PROCESSING."""

self.refresh_from_db(fields=["position"])
return self.position == 1

def begin_processing_condition_no_instances_currently_processing(self) -> bool:
Expand Down Expand Up @@ -478,11 +476,9 @@ def begin_processing(self):
multiple instances it's necessary for this method to perform a save()
operation upon successful transitions.
"""
instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)
instance.processing_started_at = make_aware(datetime.now())
instance.save()
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.processing_started_at = make_aware(datetime.now())
self.save()

@create_envelope_on_completed_processing
@save_after
Expand Down Expand Up @@ -622,7 +618,6 @@ def cds_notified_notification_log(self) -> NotificationLog:

@atomic
@create_envelope_on_new_top
@refresh_after
def pop_top(self) -> "PackagedWorkBasket":
"""
Pop the top-most instance, shuffling all remaining queued instances
Expand All @@ -631,34 +626,23 @@ def pop_top(self) -> "PackagedWorkBasket":
Management of the popped instance's `processing_state` is not altered by
this function and should be managed separately by the caller.
"""

instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)

if instance.position != 1:
if self.position != 1:
raise PackagedWorkBasketInvalidQueueOperation(
"Unable to pop instance at position {instance.position} in queue "
"Unable to pop instance at position {self.position} in queue "
"because it is not at position 1.",
)

instance.position = 0
instance.save()

to_update = list(
PackagedWorkBasket.objects.select_for_update(nowait=True)
.filter(position__gt=1)
.values_list("pk", flat=True),
)
PackagedWorkBasket.objects.filter(pk__in=to_update).update(
PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
position__gt=0,
).update(
position=F("position") - 1,
)
self.refresh_from_db()

return instance
return self

@atomic
@create_envelope_on_new_top
@refresh_after
def remove_from_queue(self) -> "PackagedWorkBasket":
"""
Remove instance from the queue, shuffling all successive queued
Expand All @@ -668,111 +652,98 @@ def remove_from_queue(self) -> "PackagedWorkBasket":
this function and should be managed separately by the caller.
"""

instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

if instance.position == 0:
if self.position == 0:
raise PackagedWorkBasketInvalidQueueOperation(
"Unable to remove instance with a position value of 0 from "
"queue because 0 indicates that it is not a queue member.",
)

current_position = instance.position
instance.position = 0
instance.save()
current_position = self.position
self.position = 0
self.save()

to_update = list(
PackagedWorkBasket.objects.select_for_update(nowait=True)
.filter(position__gt=current_position)
.values_list("pk", flat=True),
)
PackagedWorkBasket.objects.filter(pk__in=to_update).update(
PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
position__gt=current_position,
).update(
position=F("position") - 1,
)
self.refresh_from_db()

return instance
return self

@atomic
@create_envelope_on_new_top
@refresh_after
def promote_to_top_position(self) -> "PackagedWorkBasket":
"""Promote the instance to the top position of the package processing
queue so that it occupies position 1."""

instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

if instance.position <= 1:
return instance
if self.position <= 1:
return self

current_position = instance.position
position = self.position

to_update = list(
PackagedWorkBasket.objects.select_for_update(nowait=True)
.filter(Q(position__gte=1) & Q(position__lt=current_position))
.values_list("pk", flat=True),
)
PackagedWorkBasket.objects.filter(pk__in=to_update).update(
position=F("position") + 1,
)
PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
Q(position__gte=1) & Q(position__lt=position),
).update(position=F("position") + 1)

instance.position = 1
instance.save()
self.position = 1
self.save()
self.refresh_from_db()

return instance
return self

@atomic
@create_envelope_on_new_top
@refresh_after
def promote_position(self) -> "PackagedWorkBasket":
"""Promote the instance by one position up the package processing
queue."""

instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

if instance.position <= 1:
return instance
if self.position <= 1:
return self

obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
position=instance.position - 1,
position=self.position - 1,
)
obj_to_swap.position += 1
instance.position -= 1

self.position -= 1
PackagedWorkBasket.objects.bulk_update(
[instance, obj_to_swap],
[self, obj_to_swap],
["position"],
)
self.refresh_from_db()

return instance
return self

@atomic
@create_envelope_on_new_top
@refresh_after
def demote_position(self) -> "PackagedWorkBasket":
"""Demote the instance by one position down the package processing
queue."""

instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

if instance.position in {0, PackagedWorkBasket.objects.max_position()}:
return instance
if self.position in {0, PackagedWorkBasket.objects.max_position()}:
return self

obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
position=instance.position + 1,
position=self.position + 1,
)
obj_to_swap.position -= 1
instance.position += 1

self.position += 1
PackagedWorkBasket.objects.bulk_update(
[instance, obj_to_swap],
[self, obj_to_swap],
["position"],
)
self.refresh_from_db()

return instance
return self
Loading

0 comments on commit 27db5b9

Please sign in to comment.