diff --git a/docs/configuration/settings.rst b/docs/configuration/settings.rst index 3946b59b3fa..ce970f0cd9a 100644 --- a/docs/configuration/settings.rst +++ b/docs/configuration/settings.rst @@ -474,3 +474,67 @@ ANALYTICS :ref:`analytics docs ` for more info on exactly what is posted along with an example. Defaults to ``True``. + + +.. _kafka-settings: + +Kafka Settings +-------------- + +See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for details on client configuration +properties. + +KAFKA_BOOTSTRAP_SERVERS +^^^^^^^^^^^^^^^^^^^^^^^ + + ``bootstrap.servers`` value for the client. Specifies endpoint(s) for the kafka client. Kafka integration is + disabled if unspecified. + +KAFKA_SECURITY_PROTOCOL +^^^^^^^^^^^^^^^^^^^^^^^ + + ``security.protocol`` value for the client. What protocol to use for communication with the broker. + + Defaults to ``plaintext`` (unencrypted). + +KAFKA_SSL_CA_PEM +^^^^^^^^^^^^^^^^ + + ``ssl.ca.pem`` value for the client (optional). Used to override the TLS truststore for broker connections. + +KAFKA_SASL_MECHANISM +^^^^^^^^^^^^^^^^^^^^ + + ``sasl.mechanisms`` value for the client (optional). Specifies the authentication method used by the kafka broker. + +KAFKA_SASL_USERNAME +^^^^^^^^^^^^^^^^^^^ + + ``sasl.username`` value for the client (optional). Username for broker authentication. + +KAFKA_SASL_PASSWORD +^^^^^^^^^^^^^^^^^^^ + + ``sasl.password`` value for the client (optional). Password for broker authentication. + +KAFKA_TASKS_STATUS_TOPIC +^^^^^^^^^^^^^^^^^^^^^^^^ + + What kafka topic to emit notifications to when tasks start/stop. + + Defaults to ``pulpcore.tasking.status``. + +KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + Whether to synchronously send task status messages. When ``True``, the task message is sent synchronously, otherwise + the sends happen asynchronously, with a background thread periodically sending messages to the kafka server. + + Defaults to ``False``. + +KAFKA_PRODUCER_POLL_TIMEOUT +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + Timeout in seconds for the kafka producer polling thread's ``poll`` calls. + + Defaults to ``0.1``. diff --git a/docs/static/task-status-v1.yaml b/docs/static/task-status-v1.yaml new file mode 100644 index 00000000000..da21c48aa41 --- /dev/null +++ b/docs/static/task-status-v1.yaml @@ -0,0 +1,55 @@ +$schema: http://json-schema.org/draft-07/hyper-schema +$id: https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml +type: object +properties: + pulp_href: + description: URI for the task in the pulp API + type: string + examples: + - /pulp/api/v3/tasks/018f973c-ad7b-7f03-96d0-b38a42c18100/ + pulp_created: + description: Created timestamp for the task + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.292394Z + pulp_last_updated: + description: Last updated timestamp for the task + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.292405Z + name: + description: Name of the task + type: string + examples: + - pulp_file.app.tasks.synchronizing.synchronize + state: + description: State of the task + type: string + enum: + - waiting + - skipped + - running + - completed + - failed + - canceled + - canceling + unblocked_at: + description: The time the task became unblocked + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.317792Z + started_at: + description: The time the task started executing + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.349481Z + finished_at: + description: The time the task finished executing + type: string + format: date-time + examples: + - 2024-05-20T18:21:28.074560Z diff --git a/pulpcore/app/serializers/task.py b/pulpcore/app/serializers/task.py index 88661b24610..1e47040c098 100755 --- a/pulpcore/app/serializers/task.py +++ b/pulpcore/app/serializers/task.py @@ -277,3 +277,21 @@ class Meta: "next_dispatch", "last_task", ) + + +class TaskStatusMessageSerializer(TaskSerializer): + """ + Serializer for Task status messages. + + Independent of other serializers in order to decouple the task message schema from other interfaces. + """ + + class Meta: + model = models.Task + fields = ModelSerializer.Meta.fields + ( + "name", + "state", + "unblocked_at", + "started_at", + "finished_at", + ) diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index d8fb7cf8183..d04b14bda47 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -324,6 +324,17 @@ # By default, use all available workers. IMPORT_WORKERS_PERCENT = 100 +# Kafka settings +KAFKA_BOOTSTRAP_SERVERS = None # kafka integration disabled by default +KAFKA_TASKS_STATUS_TOPIC = "pulpcore.tasking.status" +KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED = False +KAFKA_PRODUCER_POLL_TIMEOUT = 0.1 +KAFKA_SECURITY_PROTOCOL = "plaintext" +KAFKA_SSL_CA_PEM = None +KAFKA_SASL_MECHANISM = None +KAFKA_SASL_USERNAME = None +KAFKA_SASL_PASSWORD = None + # HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py) # Read more at https://www.dynaconf.com/django/ from dynaconf import DjangoDynaconf, Validator # noqa diff --git a/pulpcore/tasking/kafka.py b/pulpcore/tasking/kafka.py new file mode 100644 index 00000000000..22f3dde6b4b --- /dev/null +++ b/pulpcore/tasking/kafka.py @@ -0,0 +1,65 @@ +import atexit +import logging +import socket +from threading import Thread +from typing import Optional + +from confluent_kafka import Producer +from django.conf import settings + +_logger = logging.getLogger(__name__) +_kafka_producer = None +_bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS") +_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT") +_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL") +_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM") +_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM") +_sasl_username = settings.get("KAFKA_SASL_USERNAME") +_sasl_password = settings.get("KAFKA_SASL_PASSWORD") + + +class KafkaProducerPollingWorker: + def __init__(self, kafka_producer): + self._kafka_producer = kafka_producer + self._running = False + self._thread = None + + def start(self): + self._running = True + self._thread = Thread(target=self._run) + self._thread.start() + + def _run(self): + while self._running: + self._kafka_producer.poll(_producer_poll_timeout) + self._kafka_producer.flush() + + def stop(self): + self._running = False + self._thread.join() + + +def get_async_kafka_producer() -> Optional[Producer]: + global _kafka_producer + if _bootstrap_servers is None: + return None + if _kafka_producer is None: + conf = { + "bootstrap.servers": _bootstrap_servers, + "security.protocol": _security_protocol, + "client.id": socket.gethostname(), + } + optional_conf = { + "ssl.ca.pem": _ssl_ca_pem, + "sasl.mechanisms": _sasl_mechanism, + "sasl.username": _sasl_username, + "sasl.password": _sasl_password, + } + for key, value in optional_conf.items(): + if value: + conf[key] = value + _kafka_producer = Producer(conf, logger=_logger) + polling_worker = KafkaProducerPollingWorker(_kafka_producer) + polling_worker.start() + atexit.register(polling_worker.stop) + return _kafka_producer diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index ca19becf9c5..5bd0316890b 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -7,12 +7,19 @@ import traceback from datetime import timedelta from gettext import gettext as _ +from typing import Optional +# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols +from cloudevents.http import CloudEvent +from cloudevents.kafka import to_structured +from django.conf import settings from django.db import connection, transaction from django.db.models import Model, Max from django_guid import get_guid + from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS from pulpcore.app.models import Task +from pulpcore.app.serializers.task import TaskStatusMessageSerializer from pulpcore.app.util import current_task, get_domain, get_prn, get_url from pulpcore.constants import ( TASK_FINAL_STATES, @@ -20,9 +27,13 @@ TASK_STATES, TASK_DISPATCH_LOCK, ) +from pulpcore.tasking.kafka import get_async_kafka_producer _logger = logging.getLogger(__name__) +_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC") +_kafka_tasks_status_producer_sync_enabled = settings.get("KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED") + def _validate_and_get_resources(resources): resource_set = set() @@ -78,9 +89,11 @@ def _execute_task(task): task.set_failed(exc, tb) _logger.info(_("Task %s failed (%s)"), task.pk, exc) _logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb)))) + _send_task_notification(task) else: task.set_completed() _logger.info(_("Task completed %s"), task.pk) + _send_task_notification(task) def dispatch( @@ -256,3 +269,32 @@ def cancel_task(task_id): cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),)) cursor.execute("NOTIFY pulp_worker_wakeup") return task + + +def _send_task_notification(task): + kafka_producer = get_async_kafka_producer() + if kafka_producer is not None: + attributes = { + "type": "pulpcore.tasking.status", + "source": "pulpcore.tasking", + "datacontenttype": "application/json", + "dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml", + } + data = TaskStatusMessageSerializer(task, context={"request": None}).data + task_message = to_structured(CloudEvent(attributes, data)) + kafka_producer.produce( + topic=_kafka_tasks_status_topic, + value=task_message.value, + key=task_message.key, + headers=task_message.headers, + on_delivery=_report_message_delivery, + ) + if _kafka_tasks_status_producer_sync_enabled: + kafka_producer.flush() + + +def _report_message_delivery(error, message): + if error is not None: + _logger.error(error) + elif _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Message delivery successfully with contents {message.value}") diff --git a/requirements.txt b/requirements.txt index 54da0e86ff6..69b85aa154a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,8 @@ asyncio-throttle>=1.0,<=1.0.2 async-timeout>=4.0.3,<4.0.4;python_version<"3.11" backoff>=2.1.2,<2.2.2 click>=8.1.0,<=8.1.7 +cloudevents==1.10.1 # Pinned because project warns "things might (and will) break with every update" +confluent-kafka~=2.4.0 cryptography>=38.0.1,<42.0.8 Django~=4.2.0 # LTS version, switch only if we have a compelling reason to django-filter>=23.1,<=24.2