diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index 40e1d3148..cc24edfa9 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -31,7 +31,6 @@ from notifications.models import EnvelopeRejectedNotification from notifications.models import NotificationLog from publishing import models as publishing_models -from publishing.models.decorators import refresh_after from publishing.models.decorators import save_after from publishing.models.decorators import skip_notifications_if_disabled from publishing.models.state import ProcessingState @@ -438,7 +437,6 @@ def begin_processing_condition_at_position_1(self) -> bool: """Django FSM condition: Instance must be at position 1 in order to complete the begin_processing transition to CURRENTLY_PROCESSING.""" - self.refresh_from_db(fields=["position"]) return self.position == 1 def begin_processing_condition_no_instances_currently_processing(self) -> bool: @@ -478,11 +476,9 @@ def begin_processing(self): multiple instances it's necessary for this method to perform a save() operation upon successful transitions. """ - instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - pk=self.pk, - ) - instance.processing_started_at = make_aware(datetime.now()) - instance.save() + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.processing_started_at = make_aware(datetime.now()) + self.save() @create_envelope_on_completed_processing @save_after @@ -622,7 +618,6 @@ def cds_notified_notification_log(self) -> NotificationLog: @atomic @create_envelope_on_new_top - @refresh_after def pop_top(self) -> "PackagedWorkBasket": """ Pop the top-most instance, shuffling all remaining queued instances @@ -631,34 +626,23 @@ def pop_top(self) -> "PackagedWorkBasket": Management of the popped instance's `processing_state` is not altered by this function and should be managed separately by the caller. """ - - instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - pk=self.pk, - ) - - if instance.position != 1: + if self.position != 1: raise PackagedWorkBasketInvalidQueueOperation( - "Unable to pop instance at position {instance.position} in queue " + "Unable to pop instance at position {self.position} in queue " "because it is not at position 1.", ) - instance.position = 0 - instance.save() - - to_update = list( - PackagedWorkBasket.objects.select_for_update(nowait=True) - .filter(position__gt=1) - .values_list("pk", flat=True), - ) - PackagedWorkBasket.objects.filter(pk__in=to_update).update( + PackagedWorkBasket.objects.select_for_update(nowait=True).filter( + position__gt=0, + ).update( position=F("position") - 1, ) + self.refresh_from_db() - return instance + return self @atomic @create_envelope_on_new_top - @refresh_after def remove_from_queue(self) -> "PackagedWorkBasket": """ Remove instance from the queue, shuffling all successive queued @@ -668,111 +652,98 @@ def remove_from_queue(self) -> "PackagedWorkBasket": this function and should be managed separately by the caller. """ - instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - pk=self.pk, - ) + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() - if instance.position == 0: + if self.position == 0: raise PackagedWorkBasketInvalidQueueOperation( "Unable to remove instance with a position value of 0 from " "queue because 0 indicates that it is not a queue member.", ) - current_position = instance.position - instance.position = 0 - instance.save() + current_position = self.position + self.position = 0 + self.save() - to_update = list( - PackagedWorkBasket.objects.select_for_update(nowait=True) - .filter(position__gt=current_position) - .values_list("pk", flat=True), - ) - PackagedWorkBasket.objects.filter(pk__in=to_update).update( + PackagedWorkBasket.objects.select_for_update(nowait=True).filter( + position__gt=current_position, + ).update( position=F("position") - 1, ) + self.refresh_from_db() - return instance + return self @atomic @create_envelope_on_new_top - @refresh_after def promote_to_top_position(self) -> "PackagedWorkBasket": """Promote the instance to the top position of the package processing queue so that it occupies position 1.""" - instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - pk=self.pk, - ) + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() - if instance.position <= 1: - return instance + if self.position <= 1: + return self - current_position = instance.position + position = self.position - to_update = list( - PackagedWorkBasket.objects.select_for_update(nowait=True) - .filter(Q(position__gte=1) & Q(position__lt=current_position)) - .values_list("pk", flat=True), - ) - PackagedWorkBasket.objects.filter(pk__in=to_update).update( - position=F("position") + 1, - ) + PackagedWorkBasket.objects.select_for_update(nowait=True).filter( + Q(position__gte=1) & Q(position__lt=position), + ).update(position=F("position") + 1) - instance.position = 1 - instance.save() + self.position = 1 + self.save() + self.refresh_from_db() - return instance + return self @atomic @create_envelope_on_new_top - @refresh_after def promote_position(self) -> "PackagedWorkBasket": """Promote the instance by one position up the package processing queue.""" - instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - pk=self.pk, - ) + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() - if instance.position <= 1: - return instance + if self.position <= 1: + return self obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - position=instance.position - 1, + position=self.position - 1, ) obj_to_swap.position += 1 - instance.position -= 1 - + self.position -= 1 PackagedWorkBasket.objects.bulk_update( - [instance, obj_to_swap], + [self, obj_to_swap], ["position"], ) + self.refresh_from_db() - return instance + return self @atomic @create_envelope_on_new_top - @refresh_after def demote_position(self) -> "PackagedWorkBasket": """Demote the instance by one position down the package processing queue.""" - instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - pk=self.pk, - ) + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() - if instance.position in {0, PackagedWorkBasket.objects.max_position()}: - return instance + if self.position in {0, PackagedWorkBasket.objects.max_position()}: + return self obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - position=instance.position + 1, + position=self.position + 1, ) obj_to_swap.position -= 1 - instance.position += 1 - + self.position += 1 PackagedWorkBasket.objects.bulk_update( - [instance, obj_to_swap], + [self, obj_to_swap], ["position"], ) + self.refresh_from_db() - return instance + return self diff --git a/publishing/tests/test_models.py b/publishing/tests/test_models.py index 2624523ca..dd7371d6f 100644 --- a/publishing/tests/test_models.py +++ b/publishing/tests/test_models.py @@ -1,5 +1,3 @@ -import threading -from functools import wraps from unittest import mock from unittest.mock import MagicMock from unittest.mock import patch @@ -7,7 +5,6 @@ import factory import freezegun import pytest -from django.db import OperationalError from django_fsm import TransitionNotAllowed from common.tests import factories @@ -472,221 +469,6 @@ def test_next_envelope_id(envelope_storage): assert Envelope.next_envelope_id() == "230002" -@pytest.mark.django_db(transaction=True) -class TestPackagingQueueRaceConditions: - """Tests that concurrent requests to reorder packaged workbaskets don't - result in duplicate or non-consecutive positions.""" - - NUM_THREADS: int = 2 - """The number of threads each test uses.""" - - THREAD_TIMEOUT: int = 5 - """The duration in seconds to wait for a thread to complete before timing - out.""" - - NUM_PACKAGED_WORKBASKETS: int = 5 - """The number of packaged workbaskets to create for each test.""" - - @pytest.fixture(autouse=True) - def setup(self, settings): - """Initialises a barrier to synchronise threads and creates packaged - workbaskets anew for each test.""" - settings.ENABLE_PACKAGING_NOTIFICATIONS = False - - self.unexpected_exceptions = [] - - self.barrier = threading.Barrier( - parties=self.NUM_THREADS, - timeout=self.THREAD_TIMEOUT, - ) - - for _ in range(self.NUM_PACKAGED_WORKBASKETS): - self._create_packaged_workbasket() - - self.packaged_workbaskets = PackagedWorkBasket.objects.filter( - processing_state__in=ProcessingState.queued_states(), - ) - - def _create_packaged_workbasket(self): - """Creates a new packaged workbasket with a unique - create_envelope_task_id.""" - with patch( - "publishing.tasks.create_xml_envelope_file.apply_async", - return_value=MagicMock(id=factory.Faker("uuid4")), - ): - factories.QueuedPackagedWorkBasketFactory() - - def assert_no_unexpected_exceptions(self): - """Asserts that no threads raised an unexpected exception.""" - assert ( - not self.unexpected_exceptions - ), f"Unexpected exception(s) raised: {self.unexpected_exceptions}" - - def assert_expected_positions(self): - """Asserts that positions in the packaging queue are both unique and in - consecutive sequence.""" - positions = list( - PackagedWorkBasket.objects.filter( - processing_state__in=ProcessingState.queued_states(), - ) - .order_by("position") - .values_list("position", flat=True), - ) - - assert len(set(positions)) == len(positions), "Duplicate positions found!" - - assert positions == list( - range(min(positions), max(positions) + 1), - ), "Non-consecutive positions found!" - - def synchronised(func): - """ - Decorator that ensures all threads wait until they can call their target - function in a synchronised fashion. - - Any unexpected exceptions raised during the execution of the decorated - function are stored for the individual test to re-raise. - """ - - @wraps(func) - def wrapper(self, *args, **kwargs): - try: - self.barrier.wait() - func(self, *args, **kwargs) - except (TransitionNotAllowed, OperationalError): - pass - except Exception as error: - self.unexpected_exceptions.append(error) - - return wrapper - - @synchronised - def synchronised_call( - self, - method_name: str, - packaged_workbasket: PackagedWorkBasket, - ): - """ - Thread-synchronised wrapper for the following `PackagedWorkBasket` - - instance methods: - - begin_processing - - abandon - - promote_to_top_position - - promote - - demote - """ - getattr(packaged_workbasket, method_name)() - - @synchronised - def synchronised_create_packaged_workbasket(self): - """Thread-synchronised wrapper method to create a new - `PackagedWorkbasket` instance.""" - self._create_packaged_workbasket() - - def execute_threads(self, threads: list[threading.Thread]): - """Starts a list of threads and waits for them to complete or - timeout.""" - for thread in threads: - thread.start() - - for thread in threads: - thread.join(timeout=self.THREAD_TIMEOUT) - if thread.is_alive(): - raise RuntimeError(f"Thread {thread.name} timed out.") - - def test_process_and_promote_to_top_packaged_workbaskets(self): - """Begins processing the top-most packaged workbasket while promoting to - the top the packaged workbasket in last place.""" - thread1 = threading.Thread( - target=self.synchronised_call, - kwargs={ - "method_name": "begin_processing", - "packaged_workbasket": self.packaged_workbaskets[0], - }, - name="BeginProcessingThread1", - ) - thread2 = threading.Thread( - target=self.synchronised_call, - kwargs={ - "method_name": "promote_to_top_position", - "packaged_workbasket": self.packaged_workbaskets[4], - }, - name="PromoteToTopThread2", - ) - - self.execute_threads(threads=[thread1, thread2]) - self.assert_no_unexpected_exceptions() - self.assert_expected_positions() - - def test_promote_and_promote_to_top_packaged_workbaskets(self): - """Promotes to the top the last-placed packaged workbasket while - promoting the one above it.""" - thread1 = threading.Thread( - target=self.synchronised_call, - kwargs={ - "method_name": "promote_to_top_position", - "packaged_workbasket": self.packaged_workbaskets[4], - }, - name="PromoteToTopThread1", - ) - thread2 = threading.Thread( - target=self.synchronised_call, - kwargs={ - "method_name": "begin_processing", - "packaged_workbasket": self.packaged_workbaskets[3], - }, - name="BeginProcessingThread2", - ) - - self.execute_threads(threads=[thread1, thread2]) - self.assert_no_unexpected_exceptions() - self.assert_expected_positions() - - def test_demote_and_promote_packaged_workbaskets(self): - """Demotes and promotes the same packaged workbasket.""" - thread1 = threading.Thread( - target=self.synchronised_call, - kwargs={ - "method_name": "demote_position", - "packaged_workbasket": self.packaged_workbaskets[2], - }, - name="DemotePositionThread1", - ) - thread2 = threading.Thread( - target=self.synchronised_call, - kwargs={ - "method_name": "promote_position", - "packaged_workbasket": self.packaged_workbaskets[2], - }, - name="PromotePositionThread2", - ) - - self.execute_threads(threads=[thread1, thread2]) - self.assert_no_unexpected_exceptions() - self.assert_expected_positions() - - def test_abandon_and_create_packaged_workbaskets(self): - """Abandons the last-placed packaged workbasket while creating a new - one.""" - thread1 = threading.Thread( - target=self.synchronised_call, - kwargs={ - "method_name": "abandon", - "packaged_workbasket": self.packaged_workbaskets[4], - }, - name="AbandonThread1", - ) - thread2 = threading.Thread( - target=self.synchronised_create_packaged_workbasket, - name="CreateThread2", - ) - - self.execute_threads(threads=[thread1, thread2]) - self.assert_no_unexpected_exceptions() - self.assert_expected_positions() - - def test_crown_dependencies_publishing_pause_and_unpause(unpause_publishing): """Test that Crown Dependencies publishing operational status can be paused and unpaused."""