forked from pulp/pulpcore
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add kafka producer to send task status messages
To try locally, you can use the oci-env kafka profile from pulp/oci_env#159. Set up the oci-env to use the kafka profile: ``` COMPOSE_PROFILE=kafka ``` From a fresh oci-env pulp instance, try: ```shell export REPO_NAME=$(head /dev/urandom | tr -dc a-z | head -c5) export REMOTE_NAME=$(head /dev/urandom | tr -dc a-z | head -c5) oci-env pulp file repository create --name $REPO_NAME oci-env pulp file remote create --name $REMOTE_NAME \ --url 'https://fixtures.pulpproject.org/file/PULP_MANIFEST' oci-env pulp file repository sync --name $REPO_NAME --remote $REMOTE_NAME ``` Then inspect the kafka message that is produced via: ```shell oci-env exec -s kafka \ /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server=localhost:9092 \ --offset earliest \ --partition 0 \ --topic pulpcore.tasking.status \ --max-messages 1 ``` Closes pulp#5337
- Loading branch information
Showing
8 changed files
with
306 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
$schema: http://json-schema.org/draft-07/hyper-schema | ||
$id: https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml | ||
type: object | ||
properties: | ||
pulp_href: | ||
description: URI for the task in the pulp API | ||
type: string | ||
examples: | ||
- /pulp/api/v3/tasks/018f973c-ad7b-7f03-96d0-b38a42c18100/ | ||
pulp_created: | ||
description: Created timestamp for the task | ||
type: string | ||
format: date-time | ||
examples: | ||
- 2024-05-20T18:21:27.292394Z | ||
pulp_last_updated: | ||
description: Last updated timestamp for the task | ||
type: string | ||
format: date-time | ||
examples: | ||
- 2024-05-20T18:21:27.292405Z | ||
name: | ||
description: Name of the task | ||
type: string | ||
examples: | ||
- pulp_file.app.tasks.synchronizing.synchronize | ||
state: | ||
description: State of the task | ||
type: string | ||
enum: | ||
- waiting | ||
- skipped | ||
- running | ||
- completed | ||
- failed | ||
- canceled | ||
- canceling | ||
unblocked_at: | ||
description: The time the task became unblocked | ||
type: string | ||
format: date-time | ||
examples: | ||
- 2024-05-20T18:21:27.317792Z | ||
started_at: | ||
description: The time the task started executing | ||
type: string | ||
format: date-time | ||
examples: | ||
- 2024-05-20T18:21:27.349481Z | ||
finished_at: | ||
description: The time the task finished executing | ||
type: string | ||
format: date-time | ||
examples: | ||
- 2024-05-20T18:21:28.074560Z |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import atexit | ||
import logging | ||
import socket | ||
from threading import Thread | ||
from typing import Optional | ||
|
||
from confluent_kafka import Producer | ||
from django.conf import settings | ||
|
||
_logger = logging.getLogger(__name__) | ||
_kafka_producer = None | ||
_bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS") | ||
_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT") | ||
_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL") | ||
_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM") | ||
_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM") | ||
_sasl_username = settings.get("KAFKA_SASL_USERNAME") | ||
_sasl_password = settings.get("KAFKA_SASL_PASSWORD") | ||
|
||
|
||
class KafkaProducerPollingWorker: | ||
def __init__(self, kafka_producer): | ||
self._kafka_producer = kafka_producer | ||
self._running = False | ||
self._thread = None | ||
|
||
def start(self): | ||
self._running = True | ||
self._thread = Thread(target=self._run) | ||
self._thread.start() | ||
|
||
def _run(self): | ||
while self._running: | ||
self._kafka_producer.poll(_producer_poll_timeout) | ||
self._kafka_producer.flush() | ||
|
||
def stop(self): | ||
self._running = False | ||
self._thread.join() | ||
|
||
|
||
def get_kafka_producer() -> Optional[Producer]: | ||
global _kafka_producer | ||
if _bootstrap_servers is None: | ||
return None | ||
if _kafka_producer is None: | ||
conf = { | ||
"bootstrap.servers": _bootstrap_servers, | ||
"security.protocol": _security_protocol, | ||
"client.id": socket.gethostname(), | ||
} | ||
optional_conf = { | ||
"ssl.ca.pem": _ssl_ca_pem, | ||
"sasl.mechanisms": _sasl_mechanism, | ||
"sasl.username": _sasl_username, | ||
"sasl.password": _sasl_password, | ||
} | ||
for key, value in optional_conf.items(): | ||
if value: | ||
conf[key] = value | ||
_kafka_producer = Producer(conf, logger=_logger) | ||
polling_worker = KafkaProducerPollingWorker(_kafka_producer) | ||
polling_worker.start() | ||
atexit.register(polling_worker.stop) | ||
return _kafka_producer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# Integrate Kafka | ||
|
||
Pulp can be configured to emit messages as tasks are created and executed. | ||
|
||
Kafka configuration depends on how the kafka broker is configured. Which settings are applicable depends on the broker | ||
configuration. | ||
|
||
For a development preview of this functionality, the kafka profile from | ||
[oci_env](https://github.com/pulp/oci_env/pull/159) can be used: | ||
|
||
``` | ||
COMPOSE_PROFILE=kafka | ||
``` | ||
|
||
After triggering task(s) any kafka consumer can be used to explore the resulting messages. | ||
For convenience, the previously mentioned `oci_env` setup contains a CLI consumer that can be invoked as follows: | ||
|
||
```shell | ||
oci-env exec -s kafka \ | ||
/opt/kafka/bin/kafka-console-consumer.sh \ | ||
--bootstrap-server=localhost:9092 \ | ||
--offset earliest \ | ||
--partition 0 \ | ||
--topic pulpcore.tasking.status \ | ||
--max-messages 1 | ||
``` | ||
|
||
## Common Configuration | ||
|
||
`KAFKA_BOOTSTRAP_SERVERS` is a comma-separated list of hostname and port pairs. Setting this enables the kafka | ||
integration. | ||
|
||
Example values: | ||
|
||
- `localhost:9092` | ||
- `kafka1.example.com:9092,kafka2.example.com:9092` | ||
|
||
## Authentication: Username/Password | ||
|
||
In order to use username/password authentication, it's necessary to set an appropriate `KAFKA_SECURITY_PROTOCOL` value: | ||
|
||
- `sasl_ssl` when the connection uses TLS. | ||
- `sasl_plaintext` when the connection does not use TLS. | ||
|
||
It's also necessary to set the appropriate value for `KAFKA_SASL_MECHANISM`; consult kafka broker configuration, typical | ||
values include: | ||
|
||
- `SCRAM-SHA-256` | ||
- `SCRAM-SHA-512` | ||
|
||
## TLS Settings | ||
|
||
If the TLS truststore needs to be customized, then `KAFKA_SSL_CA_PEM` can be used to provide CA certs in PEM format. | ||
|
||
!!! note | ||
The pulp kafka integration does not currently expose settings necessary for mTLS (client certificates). | ||
|
||
## Other settings | ||
|
||
See [Kafka Settings](../reference/settings.md#kafka-settings) for details. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters