Skip to content

Commit

Permalink
chore: add sasl password and username
Browse files Browse the repository at this point in the history
  • Loading branch information
elisalimli committed Mar 16, 2024
1 parent 00c2ed4 commit ee7dc65
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
6 changes: 5 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ REDIS_PORT=6379

# Kafka
KAFKA_TOPIC_INGESTION=ingestion
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093)
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093)
KAFKA_SECURITY_PROTOCOL=
KAFKA_SASL_MECHANISM=
KAFKA_SASL_PLAIN_USERNAME=
KAFKA_SASL_PLAIN_PASSWORD=
9 changes: 7 additions & 2 deletions service/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from kafka import KafkaConsumer
from decouple import config

from service.kafka.config import kafka_bootstrap_servers

Expand All @@ -9,10 +10,14 @@ def get_kafka_consumer(topic: str):
consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_bootstrap_servers,
group_id="my-group",
security_protocol=config("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT"),
sasl_mechanism=config("KAFKA_SASL_MECHANISM", "PLAIN"),
sasl_plain_username=config("KAFKA_SASL_PLAIN_USERNAME", None),
sasl_plain_password=config("KAFKA_SASL_PLAIN_PASSWORD", None),
auto_offset_reset="earliest",
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode("ascii")),
group_id="my-group",
enable_auto_commit=False,
)

return consumer
11 changes: 9 additions & 2 deletions service/kafka/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from kafka import KafkaProducer

from decouple import config
from service.kafka.config import kafka_bootstrap_servers

kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
kafka_producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
security_protocol=config("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT"),
sasl_mechanism=config("KAFKA_SASL_MECHANISM", "PLAIN"),
sasl_plain_username=config("KAFKA_SASL_PLAIN_USERNAME", None),
sasl_plain_password=config("KAFKA_SASL_PLAIN_PASSWORD", None),
api_version_auto_timeout_ms=100000,
)

0 comments on commit ee7dc65

Please sign in to comment.