Skip to content

Commit

Permalink
Add unblocked_at to tasks
Browse files Browse the repository at this point in the history
Fixes #5057
  • Loading branch information
mdellweg committed Feb 27, 2024
1 parent 43f313b commit 51c29b5
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGES/5057.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added ``unblocked_at`` to tasks to distingish waiting for other resources from waiting for a free worker.
5 changes: 0 additions & 5 deletions docs/contributing/platform-api/tasking.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ pulp.tasking

.. automodule:: pulpcore.tasking

pulp.tasking.constants
----------------------

.. automodule:: pulpcore.tasking.constants

pulp.tasking.worker
-------------------

Expand Down
18 changes: 18 additions & 0 deletions pulpcore/app/migrations/0117_task_unblocked_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.10 on 2024-02-21 15:49

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0116_alter_remoteartifact_md5_alter_remoteartifact_sha1_and_more'),
]

operations = [
migrations.AddField(
model_name='task',
name='unblocked_at',
field=models.DateTimeField(null=True),
),
]
8 changes: 8 additions & 0 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Django models related to the Tasking system
"""

import logging
import traceback
from contextlib import suppress
Expand Down Expand Up @@ -78,6 +79,7 @@ class Task(BaseModel, AutoAddObjPermsMixin):
name = models.TextField()
logging_cid = models.TextField(db_index=True)

unblocked_at = models.DateTimeField(null=True)
started_at = models.DateTimeField(null=True)
finished_at = models.DateTimeField(null=True)

Expand Down Expand Up @@ -255,6 +257,12 @@ def set_canceled(self, final_state=TASK_STATES.CANCELED, reason=None):
with suppress(AttributeError):
del self.error

def unblock(self):
# This should be safe to be called without holding the lock.
Task.objects.filter(pk=self.pk).update(unblocked_at=timezone.now())
with suppress(AttributeError):
del self.unblocked_at

# Example taken from here:
# https://docs.djangoproject.com/en/3.2/ref/models/instances/#refreshing-objects-from-database
def refresh_from_db(self, using=None, fields=None, **kwargs):
Expand Down
9 changes: 7 additions & 2 deletions pulpcore/app/serializers/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ class TaskSerializer(ModelSerializer):
help_text=_("The logging correlation id associated with this task")
)
created_by = serializers.SerializerMethodField(help_text=_("User who dispatched this task."))
unblocked_at = serializers.DateTimeField(
help_text=_("Timestamp of when this task was identified ready for pickup."), read_only=True
)
started_at = serializers.DateTimeField(
help_text=_("Timestamp of the when this task started execution."), read_only=True
help_text=_("Timestamp of when this task started execution."), read_only=True
)
finished_at = serializers.DateTimeField(
help_text=_("Timestamp of the when this task stopped execution."), read_only=True
help_text=_("Timestamp of when this task stopped execution."), read_only=True
)
error = serializers.DictField(
child=serializers.JSONField(),
Expand Down Expand Up @@ -104,6 +107,7 @@ class Meta:
"name",
"logging_cid",
"created_by",
"unblocked_at",
"started_at",
"finished_at",
"error",
Expand All @@ -123,6 +127,7 @@ class Meta:
fields = ModelSerializer.Meta.fields + (
"name",
"state",
"unblocked_at",
"started_at",
"finished_at",
"worker",
Expand Down
1 change: 1 addition & 0 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# !!! Never change these values !!!
TASK_DISPATCH_LOCK = 21
TASK_SCHEDULING_LOCK = 42
TASK_UNBLOCKING_LOCK = 84


#: All valid task states.
Expand Down
8 changes: 0 additions & 8 deletions pulpcore/tasking/constants.py

This file was deleted.

1 change: 1 addition & 0 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def dispatch(
reserved_resources_record__overlap=colliding_resources
).exists()
):
task.unblock()
execute_task(task)
if resources:
notify_workers = True
Expand Down
121 changes: 82 additions & 39 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
from django.db import connection
from django.utils import timezone

from pulpcore.constants import TASK_STATES, TASK_INCOMPLETE_STATES, TASK_SCHEDULING_LOCK
from pulpcore.constants import (
TASK_STATES,
TASK_INCOMPLETE_STATES,
TASK_SCHEDULING_LOCK,
TASK_UNBLOCKING_LOCK,
)
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus
Expand All @@ -30,7 +35,6 @@
PGAdvisoryLock,
)


_logger = logging.getLogger(__name__)
random.seed()

Expand Down Expand Up @@ -195,27 +199,61 @@ def is_compatible(self, task):
return False
return True

def identify_unblocked_tasks(self):
"""Iterate over waiting tasks and mark them unblocked accordingly."""

changed = False
taken_exclusive_resources = set()
taken_shared_resources = set()
# When batching this query, be sure to use "pulp_created" as a cursor
for task in Task.objects.filter(state__in=TASK_INCOMPLETE_STATES).order_by("pulp_created"):
reserved_resources_record = task.reserved_resources_record or []
exclusive_resources = [
resource
for resource in reserved_resources_record
if not resource.startswith("shared:")
]
shared_resources = [
resource[7:]
for resource in reserved_resources_record
if resource.startswith("shared:") and resource[7:] not in exclusive_resources
]
if task.state == TASK_STATES.CANCELING:
if task.unblocked_at is None:
_logger.debug("Marking canceling task %s unblocked.", task.pk)
task.unblock()
changed = True
# Don't consider this tasks reosurces as held.
continue

elif (
task.state == TASK_STATES.WAITING
and task.unblocked_at is None
# No exclusive resource taken?
and not any(
resource in taken_exclusive_resources or resource in taken_shared_resources
for resource in exclusive_resources
)
# No shared resource exclusively taken?
and not any(resource in taken_exclusive_resources for resource in shared_resources)
):
_logger.debug("Marking waiting task %s unblocked.", task.pk)
task.unblock()
changed = True

# Record the resources of the pending task
taken_exclusive_resources.update(exclusive_resources)
taken_shared_resources.update(shared_resources)
return changed

def iter_tasks(self):
"""Iterate over ready tasks and yield each task while holding the lock."""

while not self.shutdown_requested:
taken_exclusive_resources = set()
taken_shared_resources = set()
# When batching this query, be sure to use "pulp_created" as a cursor
for task in Task.objects.filter(state__in=TASK_INCOMPLETE_STATES).order_by(
"pulp_created"
):
reserved_resources_record = task.reserved_resources_record or []
exclusive_resources = [
resource
for resource in reserved_resources_record
if not resource.startswith("shared:")
]
shared_resources = [
resource[7:]
for resource in reserved_resources_record
if resource.startswith("shared:") and resource[7:] not in exclusive_resources
]
for task in Task.objects.filter(
state__in=TASK_INCOMPLETE_STATES, unblocked_at__isnull=False
).order_by("pulp_created"):
with contextlib.suppress(AdvisoryLockError), task:
# This code will only be called if we acquired the lock successfully
# The lock will be automatically be released at the end of the block
Expand All @@ -240,27 +278,13 @@ def iter_tasks(self):
# This statement is using lazy evaluation
if (
task.state == TASK_STATES.WAITING
and task.unblocked_at is not None
and self.is_compatible(task)
# No exclusive resource taken?
and not any(
resource in taken_exclusive_resources
or resource in taken_shared_resources
for resource in exclusive_resources
)
# No shared resource exclusively taken?
and not any(
resource in taken_exclusive_resources for resource in shared_resources
)
):
yield task
# Start from the top of the Task list
break

# Record the resources of the pending task we didn't get
taken_exclusive_resources.update(exclusive_resources)
taken_shared_resources.update(shared_resources)
else:
# If we got here, there is nothing to do
break

def sleep(self):
Expand Down Expand Up @@ -314,6 +338,12 @@ def supervise_task(self, task):
_logger.info(_("Received signal to cancel current task %s."), task.pk)
cancel_state = TASK_STATES.CANCELED
self.cancel_task = False
if self.wakeup:
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(
TASK_UNBLOCKING_LOCK
):
self.identify_unblocked_tasks()
self.wakeup = False
if task_process.sentinel in r:
if not task_process.is_alive():
break
Expand Down Expand Up @@ -350,6 +380,18 @@ def supervise_task(self, task):
self.notify_workers()
self.task = None

def handle_available_tasks(self):
keep_looping = True
while keep_looping and not self.shutdown_requested:
try:
with PGAdvisoryLock(TASK_UNBLOCKING_LOCK):
keep_looping = self.identify_unblocked_tasks()
except AdvisoryLockError:
keep_looping = True
for task in self.iter_tasks():
keep_looping = True
self.supervise_task(task)

def run(self, burst=False):
with WorkerDirectory(self.name):
signal.signal(signal.SIGINT, self._signal_handler)
Expand All @@ -359,15 +401,16 @@ def run(self, burst=False):
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_cancel")
if burst:
for task in self.iter_tasks():
self.supervise_task(task)
self.handle_available_tasks()
else:
self.cursor.execute("LISTEN pulp_worker_wakeup")
while not self.shutdown_requested:
for task in self.iter_tasks():
self.supervise_task(task)
if not self.shutdown_requested:
self.sleep()
if self.shutdown_requested:
break
self.handle_available_tasks()
if self.shutdown_requested:
break
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_cancel")
self.shutdown()
6 changes: 0 additions & 6 deletions staging_docs/reference/platform-api/tasking.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@
.. automodule:: pulpcore.tasking
```

## pulp.tasking.constants

```{eval-rst}
.. automodule:: pulpcore.tasking.constants
```

## pulp.tasking.worker

```{eval-rst}
Expand Down

0 comments on commit 51c29b5

Please sign in to comment.