Skip to content

Commit 44c53d3

Browse files
committed
Add kafka producer to send task status messages
To try locally, you can use the oci-env kafka profile from pulp/oci_env#159. Set up the oci-env to use the kafka profile: ``` COMPOSE_PROFILE=kafka ``` From a fresh oci-env pulp instance, try: ```shell export REPO_NAME=$(head /dev/urandom | tr -dc a-z | head -c5) export REMOTE_NAME=$(head /dev/urandom | tr -dc a-z | head -c5) oci-env pulp file repository create --name $REPO_NAME oci-env pulp file remote create --name $REMOTE_NAME \ --url 'https://fixtures.pulpproject.org/file/PULP_MANIFEST' oci-env pulp file repository sync --name $REPO_NAME --remote $REMOTE_NAME ``` Then inspect the kafka message that is produced via: ```shell oci-env exec -s kafka \ /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server=localhost:9092 \ --offset earliest \ --partition 0 \ --topic pulpcore.tasking.status \ --max-messages 1 ``` Closes pulp#5337
1 parent 6fe9bf3 commit 44c53d3

File tree

9 files changed

+307
-0
lines changed

9 files changed

+307
-0
lines changed

CHANGES/5337.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added kafka integration (tech-preview).

docs/static/task-status-v1.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
$schema: http://json-schema.org/draft-07/hyper-schema
2+
$id: https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml
3+
type: object
4+
properties:
5+
pulp_href:
6+
description: URI for the task in the pulp API
7+
type: string
8+
examples:
9+
- /pulp/api/v3/tasks/018f973c-ad7b-7f03-96d0-b38a42c18100/
10+
pulp_created:
11+
description: Created timestamp for the task
12+
type: string
13+
format: date-time
14+
examples:
15+
- 2024-05-20T18:21:27.292394Z
16+
pulp_last_updated:
17+
description: Last updated timestamp for the task
18+
type: string
19+
format: date-time
20+
examples:
21+
- 2024-05-20T18:21:27.292405Z
22+
name:
23+
description: Name of the task
24+
type: string
25+
examples:
26+
- pulp_file.app.tasks.synchronizing.synchronize
27+
state:
28+
description: State of the task
29+
type: string
30+
enum:
31+
- waiting
32+
- skipped
33+
- running
34+
- completed
35+
- failed
36+
- canceled
37+
- canceling
38+
unblocked_at:
39+
description: The time the task became unblocked
40+
type: string
41+
format: date-time
42+
examples:
43+
- 2024-05-20T18:21:27.317792Z
44+
started_at:
45+
description: The time the task started executing
46+
type: string
47+
format: date-time
48+
examples:
49+
- 2024-05-20T18:21:27.349481Z
50+
finished_at:
51+
description: The time the task finished executing
52+
type: string
53+
format: date-time
54+
examples:
55+
- 2024-05-20T18:21:28.074560Z

pulpcore/app/serializers/task.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,22 @@ class Meta:
277277
"next_dispatch",
278278
"last_task",
279279
)
280+
281+
282+
class TaskStatusMessageSerializer(TaskSerializer):
283+
"""
284+
Serializer for Task status messages.
285+
286+
Independent of other serializers in order to decouple the task message schema from other
287+
interfaces.
288+
"""
289+
290+
class Meta:
291+
model = models.Task
292+
fields = ModelSerializer.Meta.fields + (
293+
"name",
294+
"state",
295+
"unblocked_at",
296+
"started_at",
297+
"finished_at",
298+
)

pulpcore/app/settings.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,17 @@
329329
# By default, use all available workers.
330330
IMPORT_WORKERS_PERCENT = 100
331331

332+
# Kafka settings
333+
KAFKA_BOOTSTRAP_SERVERS = None # kafka integration disabled by default
334+
KAFKA_TASKS_STATUS_TOPIC = "pulpcore.tasking.status"
335+
KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED = False
336+
KAFKA_PRODUCER_POLL_TIMEOUT = 0.1
337+
KAFKA_SECURITY_PROTOCOL = "plaintext"
338+
KAFKA_SSL_CA_PEM = None
339+
KAFKA_SASL_MECHANISM = None
340+
KAFKA_SASL_USERNAME = None
341+
KAFKA_SASL_PASSWORD = None
342+
332343
# HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py)
333344
# Read more at https://www.dynaconf.com/django/
334345
from dynaconf import DjangoDynaconf, Validator # noqa

pulpcore/tasking/kafka.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import atexit
2+
import logging
3+
import socket
4+
from threading import Thread
5+
from typing import Optional
6+
7+
from confluent_kafka import Producer
8+
from django.conf import settings
9+
10+
_logger = logging.getLogger(__name__)
11+
_kafka_producer = None
12+
_bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS")
13+
_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT")
14+
_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL")
15+
_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM")
16+
_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM")
17+
_sasl_username = settings.get("KAFKA_SASL_USERNAME")
18+
_sasl_password = settings.get("KAFKA_SASL_PASSWORD")
19+
20+
21+
class KafkaProducerPollingWorker:
22+
def __init__(self, kafka_producer):
23+
self._kafka_producer = kafka_producer
24+
self._running = False
25+
self._thread = None
26+
27+
def start(self):
28+
self._running = True
29+
self._thread = Thread(target=self._run)
30+
self._thread.start()
31+
32+
def _run(self):
33+
while self._running:
34+
self._kafka_producer.poll(_producer_poll_timeout)
35+
self._kafka_producer.flush()
36+
37+
def stop(self):
38+
self._running = False
39+
self._thread.join()
40+
41+
42+
def get_kafka_producer() -> Optional[Producer]:
43+
global _kafka_producer
44+
if _bootstrap_servers is None:
45+
return None
46+
if _kafka_producer is None:
47+
conf = {
48+
"bootstrap.servers": _bootstrap_servers,
49+
"security.protocol": _security_protocol,
50+
"client.id": socket.gethostname(),
51+
}
52+
optional_conf = {
53+
"ssl.ca.pem": _ssl_ca_pem,
54+
"sasl.mechanisms": _sasl_mechanism,
55+
"sasl.username": _sasl_username,
56+
"sasl.password": _sasl_password,
57+
}
58+
for key, value in optional_conf.items():
59+
if value:
60+
conf[key] = value
61+
_kafka_producer = Producer(conf, logger=_logger)
62+
polling_worker = KafkaProducerPollingWorker(_kafka_producer)
63+
polling_worker.start()
64+
atexit.register(polling_worker.stop)
65+
return _kafka_producer

pulpcore/tasking/tasks.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,30 @@
88
from datetime import timedelta
99
from gettext import gettext as _
1010

11+
# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols
12+
from cloudevents.http import CloudEvent
13+
from cloudevents.kafka import to_structured
14+
from django.conf import settings
1115
from django.db import connection, transaction
1216
from django.db.models import Model, Max
1317
from django_guid import get_guid
1418
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
1519
from pulpcore.app.models import Task
20+
from pulpcore.app.serializers.task import TaskStatusMessageSerializer
1621
from pulpcore.app.util import current_task, get_domain, get_prn
1722
from pulpcore.constants import (
1823
TASK_FINAL_STATES,
1924
TASK_INCOMPLETE_STATES,
2025
TASK_STATES,
2126
TASK_DISPATCH_LOCK,
2227
)
28+
from pulpcore.tasking.kafka import get_kafka_producer
2329

2430
_logger = logging.getLogger(__name__)
2531

32+
_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC")
33+
_kafka_tasks_status_producer_sync_enabled = settings.get("KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED")
34+
2635

2736
def _validate_and_get_resources(resources):
2837
resource_set = set()
@@ -74,9 +83,11 @@ def _execute_task(task):
7483
task.set_failed(exc, tb)
7584
_logger.info(_("Task %s failed (%s)"), task.pk, exc)
7685
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
86+
_send_task_notification(task)
7787
else:
7888
task.set_completed()
7989
_logger.info(_("Task completed %s"), task.pk)
90+
_send_task_notification(task)
8091

8192

8293
def dispatch(
@@ -250,3 +261,32 @@ def cancel_task(task_id):
250261
cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),))
251262
cursor.execute("NOTIFY pulp_worker_wakeup")
252263
return task
264+
265+
266+
def _send_task_notification(task):
267+
kafka_producer = get_kafka_producer()
268+
if kafka_producer is not None:
269+
attributes = {
270+
"type": "pulpcore.tasking.status",
271+
"source": "pulpcore.tasking",
272+
"datacontenttype": "application/json",
273+
"dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml",
274+
}
275+
data = TaskStatusMessageSerializer(task, context={"request": None}).data
276+
task_message = to_structured(CloudEvent(attributes, data))
277+
kafka_producer.produce(
278+
topic=_kafka_tasks_status_topic,
279+
value=task_message.value,
280+
key=task_message.key,
281+
headers=task_message.headers,
282+
on_delivery=_report_message_delivery,
283+
)
284+
if _kafka_tasks_status_producer_sync_enabled:
285+
kafka_producer.flush()
286+
287+
288+
def _report_message_delivery(error, message):
289+
if error is not None:
290+
_logger.error(error)
291+
elif _logger.isEnabledFor(logging.DEBUG):
292+
_logger.debug(f"Message delivery successfully with contents {message.value}")

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ asyncio-throttle>=1.0,<=1.0.2
55
async-timeout>=4.0.3,<4.0.4;python_version<"3.11"
66
backoff>=2.1.2,<2.2.2
77
click>=8.1.0,<=8.1.7
8+
cloudevents==1.10.1 # Pinned because project warns "things might (and will) break with every update"
9+
confluent-kafka~=2.4.0
810
cryptography>=38.0.1,<42.0.9
911
Django~=4.2.0 # LTS version, switch only if we have a compelling reason to
1012
django-filter>=23.1,<=24.2
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Integrate Kafka
2+
3+
Pulp can be configured to emit messages as tasks are created and executed.
4+
5+
Kafka configuration depends on how the kafka broker is configured. Which settings are applicable depends on the broker
6+
configuration.
7+
8+
For a development preview of this functionality, the kafka profile from
9+
[oci_env](https://github.com/pulp/oci_env/pull/159) can be used:
10+
11+
```
12+
COMPOSE_PROFILE=kafka
13+
```
14+
15+
After triggering task(s) any kafka consumer can be used to explore the resulting messages.
16+
For convenience, the previously mentioned `oci_env` setup contains a CLI consumer that can be invoked as follows:
17+
18+
```shell
19+
oci-env exec -s kafka \
20+
/opt/kafka/bin/kafka-console-consumer.sh \
21+
--bootstrap-server=localhost:9092 \
22+
--offset earliest \
23+
--partition 0 \
24+
--topic pulpcore.tasking.status \
25+
--max-messages 1
26+
```
27+
28+
## Common Configuration
29+
30+
`KAFKA_BOOTSTRAP_SERVERS` is a comma-separated list of hostname and port pairs. Setting this enables the kafka
31+
integration.
32+
33+
Example values:
34+
35+
- `localhost:9092`
36+
- `kafka1.example.com:9092,kafka2.example.com:9092`
37+
38+
## Authentication: Username/Password
39+
40+
In order to use username/password authentication, it's necessary to set an appropriate `KAFKA_SECURITY_PROTOCOL` value:
41+
42+
- `sasl_ssl` when the connection uses TLS.
43+
- `sasl_plaintext` when the connection does not use TLS.
44+
45+
It's also necessary to set the appropriate value for `KAFKA_SASL_MECHANISM`; consult kafka broker configuration, typical
46+
values include:
47+
48+
- `SCRAM-SHA-256`
49+
- `SCRAM-SHA-512`
50+
51+
## TLS Settings
52+
53+
If the TLS truststore needs to be customized, then `KAFKA_SSL_CA_PEM` can be used to provide CA certs in PEM format.
54+
55+
!!! note
56+
The pulp kafka integration does not currently expose settings necessary for mTLS (client certificates).
57+
58+
## Other settings
59+
60+
See [Kafka Settings](../reference/settings.md#kafka-settings) for details.

staging_docs/admin/reference/settings.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,57 @@ If `True`, Pulp will anonymously post analytics information to
403403
`analytics docs ` for more info on exactly what is posted along with an example.
404404

405405
Defaults to `True`.
406+
407+
## Kafka Settings
408+
409+
!!! note
410+
Kafka integration functionality is in tech preview and may change based on user feedback.
411+
412+
See [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
413+
for details on client configuration properties.
414+
415+
### KAFKA_BOOTSTRAP_SERVERS
416+
417+
`bootstrap.servers` value for the client. Specifies endpoint(s) for the kafka client. Kafka integration is disabled if
418+
unspecified.
419+
420+
### KAFKA_SECURITY_PROTOCOL
421+
422+
`security.protocol` value for the client. What protocol to use for communication with the broker.
423+
424+
Defaults to `plaintext` (unencrypted).
425+
426+
### KAFKA_SSL_CA_PEM
427+
428+
`ssl.ca.pem` value for the client (optional). Used to override the TLS truststore for broker connections.
429+
430+
### KAFKA_SASL_MECHANISM
431+
432+
`sasl.mechanisms` value for the client (optional). Specifies the authentication method used by the kafka broker.
433+
434+
### KAFKA_SASL_USERNAME
435+
436+
`sasl.username` value for the client (optional). Username for broker authentication.
437+
438+
### KAFKA_SASL_PASSWORD
439+
440+
`sasl.password` value for the client (optional). Password for broker authentication.
441+
442+
### KAFKA_TASKS_STATUS_TOPIC
443+
444+
What kafka topic to emit notifications to when tasks start/stop.
445+
446+
Defaults to `pulpcore.tasking.status`.
447+
448+
### KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED
449+
450+
Whether to synchronously send task status messages. When `True`, the task message is sent synchronously, otherwise the
451+
sends happen asynchronously, with a background thread periodically sending messages to the kafka server.
452+
453+
Defaults to `False`.
454+
455+
### KAFKA_PRODUCER_POLL_TIMEOUT
456+
457+
Timeout in seconds for the kafka producer polling thread's `poll` calls.
458+
459+
Defaults to `0.1`.

0 commit comments

Comments
 (0)