From 51c29b503c12edc9ac6072177ef6c5c20cc27cf6 Mon Sep 17 00:00:00 2001 From: Matthias Dellweg Date: Wed, 21 Feb 2024 16:50:39 +0100 Subject: [PATCH] Add unblocked_at to tasks Fixes #5057 --- CHANGES/5057.feature | 1 + docs/contributing/platform-api/tasking.rst | 5 - .../app/migrations/0117_task_unblocked_at.py | 18 +++ pulpcore/app/models/task.py | 8 ++ pulpcore/app/serializers/task.py | 9 +- pulpcore/constants.py | 1 + pulpcore/tasking/constants.py | 8 -- pulpcore/tasking/tasks.py | 1 + pulpcore/tasking/worker.py | 121 ++++++++++++------ .../reference/platform-api/tasking.md | 6 - 10 files changed, 118 insertions(+), 60 deletions(-) create mode 100644 CHANGES/5057.feature create mode 100644 pulpcore/app/migrations/0117_task_unblocked_at.py delete mode 100644 pulpcore/tasking/constants.py diff --git a/CHANGES/5057.feature b/CHANGES/5057.feature new file mode 100644 index 0000000000..42d59bbb02 --- /dev/null +++ b/CHANGES/5057.feature @@ -0,0 +1 @@ +Added ``unblocked_at`` to tasks to distingish waiting for other resources from waiting for a free worker. diff --git a/docs/contributing/platform-api/tasking.rst b/docs/contributing/platform-api/tasking.rst index 1a11cd0132..ddf5a5c44d 100644 --- a/docs/contributing/platform-api/tasking.rst +++ b/docs/contributing/platform-api/tasking.rst @@ -4,11 +4,6 @@ pulp.tasking .. automodule:: pulpcore.tasking -pulp.tasking.constants ----------------------- - -.. automodule:: pulpcore.tasking.constants - pulp.tasking.worker ------------------- diff --git a/pulpcore/app/migrations/0117_task_unblocked_at.py b/pulpcore/app/migrations/0117_task_unblocked_at.py new file mode 100644 index 0000000000..4b56cea3cf --- /dev/null +++ b/pulpcore/app/migrations/0117_task_unblocked_at.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.10 on 2024-02-21 15:49 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0116_alter_remoteartifact_md5_alter_remoteartifact_sha1_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='task', + name='unblocked_at', + field=models.DateTimeField(null=True), + ), + ] diff --git a/pulpcore/app/models/task.py b/pulpcore/app/models/task.py index 3bb2174ce3..3cf5fd62b4 100644 --- a/pulpcore/app/models/task.py +++ b/pulpcore/app/models/task.py @@ -1,6 +1,7 @@ """ Django models related to the Tasking system """ + import logging import traceback from contextlib import suppress @@ -78,6 +79,7 @@ class Task(BaseModel, AutoAddObjPermsMixin): name = models.TextField() logging_cid = models.TextField(db_index=True) + unblocked_at = models.DateTimeField(null=True) started_at = models.DateTimeField(null=True) finished_at = models.DateTimeField(null=True) @@ -255,6 +257,12 @@ def set_canceled(self, final_state=TASK_STATES.CANCELED, reason=None): with suppress(AttributeError): del self.error + def unblock(self): + # This should be safe to be called without holding the lock. + Task.objects.filter(pk=self.pk).update(unblocked_at=timezone.now()) + with suppress(AttributeError): + del self.unblocked_at + # Example taken from here: # https://docs.djangoproject.com/en/3.2/ref/models/instances/#refreshing-objects-from-database def refresh_from_db(self, using=None, fields=None, **kwargs): diff --git a/pulpcore/app/serializers/task.py b/pulpcore/app/serializers/task.py index f9b9c42717..88661b2461 100755 --- a/pulpcore/app/serializers/task.py +++ b/pulpcore/app/serializers/task.py @@ -38,11 +38,14 @@ class TaskSerializer(ModelSerializer): help_text=_("The logging correlation id associated with this task") ) created_by = serializers.SerializerMethodField(help_text=_("User who dispatched this task.")) + unblocked_at = serializers.DateTimeField( + help_text=_("Timestamp of when this task was identified ready for pickup."), read_only=True + ) started_at = serializers.DateTimeField( - help_text=_("Timestamp of the when this task started execution."), read_only=True + help_text=_("Timestamp of when this task started execution."), read_only=True ) finished_at = serializers.DateTimeField( - help_text=_("Timestamp of the when this task stopped execution."), read_only=True + help_text=_("Timestamp of when this task stopped execution."), read_only=True ) error = serializers.DictField( child=serializers.JSONField(), @@ -104,6 +107,7 @@ class Meta: "name", "logging_cid", "created_by", + "unblocked_at", "started_at", "finished_at", "error", @@ -123,6 +127,7 @@ class Meta: fields = ModelSerializer.Meta.fields + ( "name", "state", + "unblocked_at", "started_at", "finished_at", "worker", diff --git a/pulpcore/constants.py b/pulpcore/constants.py index aaaa4d431d..30759e6696 100644 --- a/pulpcore/constants.py +++ b/pulpcore/constants.py @@ -11,6 +11,7 @@ # !!! Never change these values !!! TASK_DISPATCH_LOCK = 21 TASK_SCHEDULING_LOCK = 42 +TASK_UNBLOCKING_LOCK = 84 #: All valid task states. diff --git a/pulpcore/tasking/constants.py b/pulpcore/tasking/constants.py deleted file mode 100644 index 15f276ef1f..0000000000 --- a/pulpcore/tasking/constants.py +++ /dev/null @@ -1,8 +0,0 @@ -from types import SimpleNamespace - -TASKING_CONSTANTS = SimpleNamespace( - # The name of resource manager entries in the workers table - RESOURCE_MANAGER_WORKER_NAME="resource-manager", - # The amount of time (in seconds) between checks - JOB_MONITORING_INTERVAL=5, -) diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index 839a4ab52f..1c89125f19 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -203,6 +203,7 @@ def dispatch( reserved_resources_record__overlap=colliding_resources ).exists() ): + task.unblock() execute_task(task) if resources: notify_workers = True diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index dc5a8f8a91..c22779ef5e 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -16,7 +16,12 @@ from django.db import connection from django.utils import timezone -from pulpcore.constants import TASK_STATES, TASK_INCOMPLETE_STATES, TASK_SCHEDULING_LOCK +from pulpcore.constants import ( + TASK_STATES, + TASK_INCOMPLETE_STATES, + TASK_SCHEDULING_LOCK, + TASK_UNBLOCKING_LOCK, +) from pulpcore.exceptions import AdvisoryLockError from pulpcore.app.apps import pulp_plugin_configs from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus @@ -30,7 +35,6 @@ PGAdvisoryLock, ) - _logger = logging.getLogger(__name__) random.seed() @@ -195,27 +199,61 @@ def is_compatible(self, task): return False return True + def identify_unblocked_tasks(self): + """Iterate over waiting tasks and mark them unblocked accordingly.""" + + changed = False + taken_exclusive_resources = set() + taken_shared_resources = set() + # When batching this query, be sure to use "pulp_created" as a cursor + for task in Task.objects.filter(state__in=TASK_INCOMPLETE_STATES).order_by("pulp_created"): + reserved_resources_record = task.reserved_resources_record or [] + exclusive_resources = [ + resource + for resource in reserved_resources_record + if not resource.startswith("shared:") + ] + shared_resources = [ + resource[7:] + for resource in reserved_resources_record + if resource.startswith("shared:") and resource[7:] not in exclusive_resources + ] + if task.state == TASK_STATES.CANCELING: + if task.unblocked_at is None: + _logger.debug("Marking canceling task %s unblocked.", task.pk) + task.unblock() + changed = True + # Don't consider this tasks reosurces as held. + continue + + elif ( + task.state == TASK_STATES.WAITING + and task.unblocked_at is None + # No exclusive resource taken? + and not any( + resource in taken_exclusive_resources or resource in taken_shared_resources + for resource in exclusive_resources + ) + # No shared resource exclusively taken? + and not any(resource in taken_exclusive_resources for resource in shared_resources) + ): + _logger.debug("Marking waiting task %s unblocked.", task.pk) + task.unblock() + changed = True + + # Record the resources of the pending task + taken_exclusive_resources.update(exclusive_resources) + taken_shared_resources.update(shared_resources) + return changed + def iter_tasks(self): """Iterate over ready tasks and yield each task while holding the lock.""" while not self.shutdown_requested: - taken_exclusive_resources = set() - taken_shared_resources = set() # When batching this query, be sure to use "pulp_created" as a cursor - for task in Task.objects.filter(state__in=TASK_INCOMPLETE_STATES).order_by( - "pulp_created" - ): - reserved_resources_record = task.reserved_resources_record or [] - exclusive_resources = [ - resource - for resource in reserved_resources_record - if not resource.startswith("shared:") - ] - shared_resources = [ - resource[7:] - for resource in reserved_resources_record - if resource.startswith("shared:") and resource[7:] not in exclusive_resources - ] + for task in Task.objects.filter( + state__in=TASK_INCOMPLETE_STATES, unblocked_at__isnull=False + ).order_by("pulp_created"): with contextlib.suppress(AdvisoryLockError), task: # This code will only be called if we acquired the lock successfully # The lock will be automatically be released at the end of the block @@ -240,27 +278,13 @@ def iter_tasks(self): # This statement is using lazy evaluation if ( task.state == TASK_STATES.WAITING + and task.unblocked_at is not None and self.is_compatible(task) - # No exclusive resource taken? - and not any( - resource in taken_exclusive_resources - or resource in taken_shared_resources - for resource in exclusive_resources - ) - # No shared resource exclusively taken? - and not any( - resource in taken_exclusive_resources for resource in shared_resources - ) ): yield task # Start from the top of the Task list break - - # Record the resources of the pending task we didn't get - taken_exclusive_resources.update(exclusive_resources) - taken_shared_resources.update(shared_resources) else: - # If we got here, there is nothing to do break def sleep(self): @@ -314,6 +338,12 @@ def supervise_task(self, task): _logger.info(_("Received signal to cancel current task %s."), task.pk) cancel_state = TASK_STATES.CANCELED self.cancel_task = False + if self.wakeup: + with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock( + TASK_UNBLOCKING_LOCK + ): + self.identify_unblocked_tasks() + self.wakeup = False if task_process.sentinel in r: if not task_process.is_alive(): break @@ -350,6 +380,18 @@ def supervise_task(self, task): self.notify_workers() self.task = None + def handle_available_tasks(self): + keep_looping = True + while keep_looping and not self.shutdown_requested: + try: + with PGAdvisoryLock(TASK_UNBLOCKING_LOCK): + keep_looping = self.identify_unblocked_tasks() + except AdvisoryLockError: + keep_looping = True + for task in self.iter_tasks(): + keep_looping = True + self.supervise_task(task) + def run(self, burst=False): with WorkerDirectory(self.name): signal.signal(signal.SIGINT, self._signal_handler) @@ -359,15 +401,16 @@ def run(self, burst=False): connection.connection.add_notify_handler(self._pg_notify_handler) self.cursor.execute("LISTEN pulp_worker_cancel") if burst: - for task in self.iter_tasks(): - self.supervise_task(task) + self.handle_available_tasks() else: self.cursor.execute("LISTEN pulp_worker_wakeup") while not self.shutdown_requested: - for task in self.iter_tasks(): - self.supervise_task(task) - if not self.shutdown_requested: - self.sleep() + if self.shutdown_requested: + break + self.handle_available_tasks() + if self.shutdown_requested: + break + self.sleep() self.cursor.execute("UNLISTEN pulp_worker_wakeup") self.cursor.execute("UNLISTEN pulp_worker_cancel") self.shutdown() diff --git a/staging_docs/reference/platform-api/tasking.md b/staging_docs/reference/platform-api/tasking.md index 4a14c90f78..f6c9558663 100644 --- a/staging_docs/reference/platform-api/tasking.md +++ b/staging_docs/reference/platform-api/tasking.md @@ -4,12 +4,6 @@ .. automodule:: pulpcore.tasking ``` -## pulp.tasking.constants - -```{eval-rst} -.. automodule:: pulpcore.tasking.constants -``` - ## pulp.tasking.worker ```{eval-rst}