Skip to content

Commit

Permalink
Add kafka producer to send task status messages
Browse files Browse the repository at this point in the history
To try locally, you can use the oci-env kafka profile from pulp/oci_env#159.

Set up the oci-env to use the kafka profile:

```
COMPOSE_PROFILE=kafka
```

From a fresh oci-env pulp instance, try:

```shell
export REPO_NAME=$(head /dev/urandom | tr -dc a-z | head -c5)
export REMOTE_NAME=$(head /dev/urandom | tr -dc a-z | head -c5)
oci-env pulp file repository create --name $REPO_NAME
oci-env pulp file remote create --name $REMOTE_NAME \
    --url 'https://fixtures.pulpproject.org/file/PULP_MANIFEST'
oci-env pulp file repository sync --name $REPO_NAME --remote $REMOTE_NAME
```

Then inspect the kafka message that is produced via:

```shell
oci-env exec -s kafka \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server=localhost:9092 \
  --offset earliest \
  --partition 0 \
  --topic pulpcore.tasking.status \
  --max-messages 1
```

Closes pulp#5337
  • Loading branch information
kahowell committed May 21, 2024
1 parent a5f88d1 commit c4030e7
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 0 deletions.
64 changes: 64 additions & 0 deletions docs/configuration/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,67 @@ ANALYTICS
:ref:`analytics docs <analytics>` for more info on exactly what is posted along with an example.

Defaults to ``True``.


.. _kafka-settings:

Kafka Settings
--------------

See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for details on client configuration
properties.

KAFKA_BOOTSTRAP_SERVERS
^^^^^^^^^^^^^^^^^^^^^^^

``bootstrap.servers`` value for the client. Specifies endpoint(s) for the kafka client. Kafka integration is
disabled if unspecified.

KAFKA_SECURITY_PROTOCOL
^^^^^^^^^^^^^^^^^^^^^^^

``security.protocol`` value for the client. What protocol to use for communication with the broker.

Defaults to ``plaintext`` (unencrypted).

KAFKA_SSL_CA_PEM
^^^^^^^^^^^^^^^^

``ssl.ca.pem`` value for the client (optional). Used to override the TLS truststore for broker connections.

KAFKA_SASL_MECHANISM
^^^^^^^^^^^^^^^^^^^^

``sasl.mechanisms`` value for the client (optional). Specifies the authentication method used by the kafka broker.

KAFKA_SASL_USERNAME
^^^^^^^^^^^^^^^^^^^

``sasl.username`` value for the client (optional). Username for broker authentication.

KAFKA_SASL_PASSWORD
^^^^^^^^^^^^^^^^^^^

``sasl.password`` value for the client (optional). Password for broker authentication.

KAFKA_TASKS_STATUS_TOPIC
^^^^^^^^^^^^^^^^^^^^^^^^

What kafka topic to emit notifications to when tasks start/stop.

Defaults to ``pulpcore.tasking.status``.

KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Whether to synchronously send task status messages. When ``True``, the task message is sent synchronously, otherwise
the sends happen asynchronously, with a background thread periodically sending messages to the kafka server.

Defaults to ``False``.

KAFKA_PRODUCER_POLL_TIMEOUT
^^^^^^^^^^^^^^^^^^^^^^^^^^^

Timeout in seconds for the kafka producer polling thread's ``poll`` calls.

Defaults to ``0.1``.
55 changes: 55 additions & 0 deletions docs/static/task-status-v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
$schema: http://json-schema.org/draft-07/hyper-schema
$id: https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml
type: object
properties:
pulp_href:
description: URI for the task in the pulp API
type: string
examples:
- /pulp/api/v3/tasks/018f973c-ad7b-7f03-96d0-b38a42c18100/
pulp_created:
description: Created timestamp for the task
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.292394Z
pulp_last_updated:
description: Last updated timestamp for the task
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.292405Z
name:
description: Name of the task
type: string
examples:
- pulp_file.app.tasks.synchronizing.synchronize
state:
description: State of the task
type: string
enum:
- waiting
- skipped
- running
- completed
- failed
- canceled
- canceling
unblocked_at:
description: The time the task became unblocked
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.317792Z
started_at:
description: The time the task started executing
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.349481Z
finished_at:
description: The time the task finished executing
type: string
format: date-time
examples:
- 2024-05-20T18:21:28.074560Z
18 changes: 18 additions & 0 deletions pulpcore/app/serializers/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,21 @@ class Meta:
"next_dispatch",
"last_task",
)


class TaskStatusMessageSerializer(TaskSerializer):
"""
Serializer for Task status messages.
Independent of other serializers in order to decouple the task message schema from other interfaces.
"""

class Meta:
model = models.Task
fields = ModelSerializer.Meta.fields + (
"name",
"state",
"unblocked_at",
"started_at",
"finished_at",
)
11 changes: 11 additions & 0 deletions pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@
# By default, use all available workers.
IMPORT_WORKERS_PERCENT = 100

# Kafka settings
KAFKA_BOOTSTRAP_SERVERS = None # kafka integration disabled by default
KAFKA_TASKS_STATUS_TOPIC = "pulpcore.tasking.status"
KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED = False
KAFKA_PRODUCER_POLL_TIMEOUT = 0.1
KAFKA_SECURITY_PROTOCOL = "plaintext"
KAFKA_SSL_CA_PEM = None
KAFKA_SASL_MECHANISM = None
KAFKA_SASL_USERNAME = None
KAFKA_SASL_PASSWORD = None

# HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py)
# Read more at https://www.dynaconf.com/django/
from dynaconf import DjangoDynaconf, Validator # noqa
Expand Down
65 changes: 65 additions & 0 deletions pulpcore/tasking/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import atexit
import logging
import socket
from threading import Thread
from typing import Optional

from confluent_kafka import Producer
from django.conf import settings

_logger = logging.getLogger(__name__)
_kafka_producer = None
_bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS")
_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT")
_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL")
_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM")
_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM")
_sasl_username = settings.get("KAFKA_SASL_USERNAME")
_sasl_password = settings.get("KAFKA_SASL_PASSWORD")


class KafkaProducerPollingWorker:
def __init__(self, kafka_producer):
self._kafka_producer = kafka_producer
self._running = False
self._thread = None

def start(self):
self._running = True
self._thread = Thread(target=self._run)
self._thread.start()

def _run(self):
while self._running:
self._kafka_producer.poll(_producer_poll_timeout)
self._kafka_producer.flush()

def stop(self):
self._running = False
self._thread.join()


def get_async_kafka_producer() -> Optional[Producer]:
global _kafka_producer
if _bootstrap_servers is None:
return None
if _kafka_producer is None:
conf = {
"bootstrap.servers": _bootstrap_servers,
"security.protocol": _security_protocol,
"client.id": socket.gethostname(),
}
optional_conf = {
"ssl.ca.pem": _ssl_ca_pem,
"sasl.mechanisms": _sasl_mechanism,
"sasl.username": _sasl_username,
"sasl.password": _sasl_password,
}
for key, value in optional_conf.items():
if value:
conf[key] = value
_kafka_producer = Producer(conf, logger=_logger)
polling_worker = KafkaProducerPollingWorker(_kafka_producer)
polling_worker.start()
atexit.register(polling_worker.stop)
return _kafka_producer
42 changes: 42 additions & 0 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,33 @@
import traceback
from datetime import timedelta
from gettext import gettext as _
from typing import Optional

# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols
from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured
from django.conf import settings
from django.db import connection, transaction
from django.db.models import Model, Max
from django_guid import get_guid

from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
from pulpcore.app.models import Task
from pulpcore.app.serializers.task import TaskStatusMessageSerializer
from pulpcore.app.util import current_task, get_domain, get_prn, get_url
from pulpcore.constants import (
TASK_FINAL_STATES,
TASK_INCOMPLETE_STATES,
TASK_STATES,
TASK_DISPATCH_LOCK,
)
from pulpcore.tasking.kafka import get_async_kafka_producer

_logger = logging.getLogger(__name__)

_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC")
_kafka_tasks_status_producer_sync_enabled = settings.get("KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED")


def _validate_and_get_resources(resources):
resource_set = set()
Expand Down Expand Up @@ -78,9 +89,11 @@ def _execute_task(task):
task.set_failed(exc, tb)
_logger.info(_("Task %s failed (%s)"), task.pk, exc)
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
_send_task_notification(task)
else:
task.set_completed()
_logger.info(_("Task completed %s"), task.pk)
_send_task_notification(task)


def dispatch(
Expand Down Expand Up @@ -256,3 +269,32 @@ def cancel_task(task_id):
cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),))
cursor.execute("NOTIFY pulp_worker_wakeup")
return task


def _send_task_notification(task):
kafka_producer = get_async_kafka_producer()
if kafka_producer is not None:
attributes = {
"type": "pulpcore.tasking.status",
"source": "pulpcore.tasking",
"datacontenttype": "application/json",
"dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml",
}
data = TaskStatusMessageSerializer(task, context={"request": None}).data
task_message = to_structured(CloudEvent(attributes, data))
kafka_producer.produce(
topic=_kafka_tasks_status_topic,
value=task_message.value,
key=task_message.key,
headers=task_message.headers,
on_delivery=_report_message_delivery,
)
if _kafka_tasks_status_producer_sync_enabled:
kafka_producer.flush()


def _report_message_delivery(error, message):
if error is not None:
_logger.error(error)
elif _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f"Message delivery successfully with contents {message.value}")
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ asyncio-throttle>=1.0,<=1.0.2
async-timeout>=4.0.3,<4.0.4;python_version<"3.11"
backoff>=2.1.2,<2.2.2
click>=8.1.0,<=8.1.7
cloudevents==1.10.1 # Pinned because project warns "things might (and will) break with every update"
confluent-kafka~=2.4.0
cryptography>=38.0.1,<42.0.8
Django~=4.2.0 # LTS version, switch only if we have a compelling reason to
django-filter>=23.1,<=24.2
Expand Down

0 comments on commit c4030e7

Please sign in to comment.