From ee7dc655aa2c48f10176d1dae46a1b4a3fddcc7e Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Sat, 16 Mar 2024 11:14:29 +0400 Subject: [PATCH] chore: add sasl password and username --- .env.example | 6 +++++- service/kafka/consumer.py | 9 +++++++-- service/kafka/producer.py | 11 +++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/.env.example b/.env.example index a4f2a8ae..15d40430 100644 --- a/.env.example +++ b/.env.example @@ -26,4 +26,8 @@ REDIS_PORT=6379 # Kafka KAFKA_TOPIC_INGESTION=ingestion -KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093) \ No newline at end of file +KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093) +KAFKA_SECURITY_PROTOCOL= +KAFKA_SASL_MECHANISM= +KAFKA_SASL_PLAIN_USERNAME= +KAFKA_SASL_PLAIN_PASSWORD= \ No newline at end of file diff --git a/service/kafka/consumer.py b/service/kafka/consumer.py index c0e7bc4e..de150c2e 100644 --- a/service/kafka/consumer.py +++ b/service/kafka/consumer.py @@ -1,6 +1,7 @@ import json from kafka import KafkaConsumer +from decouple import config from service.kafka.config import kafka_bootstrap_servers @@ -9,10 +10,14 @@ def get_kafka_consumer(topic: str): consumer = KafkaConsumer( topic, bootstrap_servers=kafka_bootstrap_servers, + group_id="my-group", + security_protocol=config("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT"), + sasl_mechanism=config("KAFKA_SASL_MECHANISM", "PLAIN"), + sasl_plain_username=config("KAFKA_SASL_PLAIN_USERNAME", None), + sasl_plain_password=config("KAFKA_SASL_PLAIN_PASSWORD", None), auto_offset_reset="earliest", - enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode("ascii")), - group_id="my-group", + enable_auto_commit=False, ) return consumer diff --git a/service/kafka/producer.py b/service/kafka/producer.py index a4a65941..b1933bae 100644 --- a/service/kafka/producer.py +++ b/service/kafka/producer.py @@ -1,5 +1,12 @@ from kafka import KafkaProducer - +from decouple import config from service.kafka.config import kafka_bootstrap_servers -kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers) +kafka_producer = KafkaProducer( + bootstrap_servers=kafka_bootstrap_servers, + security_protocol=config("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT"), + sasl_mechanism=config("KAFKA_SASL_MECHANISM", "PLAIN"), + sasl_plain_username=config("KAFKA_SASL_PLAIN_USERNAME", None), + sasl_plain_password=config("KAFKA_SASL_PLAIN_PASSWORD", None), + api_version_auto_timeout_ms=100000, +)