-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscript.py
164 lines (134 loc) · 5.62 KB
/
script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# Updated On Nov 08, 2023.
"""
KafkaClient - Class for Kafka Producer and Consumer
This Python class, KafkaClient, encapsulates Kafka producer and consumer functionality using the
confluent-kafka library. It provides a convenient way to create a Kafka producer and consumer, produce and consume
messages, and close the Kafka client.
Usage:
1. Create an instance of KafkaClient with the Kafka broker address and group ID.
2. Use create_producer() to create a Kafka producer.
3. Use produce_message() to send messages to a specified Kafka topic.
4. Use create_consumer() to create a Kafka consumer and subscribe to a topic.
5. Use consume_messages() to start consuming messages.
6. Use close() to gracefully close the Kafka client.
Usage:
if __name__ == '__main__':
bootstrap_servers = 'your_kafka_broker_address:9092'
topic_name = 'your_topic_name'
kafka_client = KafkaClient(bootstrap_servers, 'python-consumer-group')
kafka_client.create_producer()
kafka_client.produce_message(topic_name, 'key1', 'Hello, Kafka!')
kafka_client.create_consumer([topic_name])
kafka_client.consume_messages()
kafka_client.close()
Author: Yasir Ali
LinkedIn: https://www.linkedin.com/in/yasirali179/
"""
from confluent_kafka import Producer, Consumer, KafkaError
class KafkaClient:
"""
A simple Kafka client for producing and consuming messages using the confluent-kafka-python library.
Parameters:
- bootstrap_servers (str): The Kafka broker address in the format 'hostname:port'.
- group_id (str): The consumer group ID for the Kafka consumer.
- auto_offset_reset (str, optional): The initial offset for the Kafka consumer. Default is 'earliest'.
Attributes:
- bootstrap_servers (str): The Kafka broker address.
- group_id (str): The consumer group ID.
- auto_offset_reset (str): The initial offset for the Kafka consumer.
- producer (confluent_kafka.Producer): The Kafka producer instance.
- consumer (confluent_kafka.Consumer): The Kafka consumer instance.
Methods:
- create_producer(): Create a Kafka producer instance.
- create_consumer(topics): Create a Kafka consumer instance and subscribe to the specified topics.
- produce_message(topic, key, value): Produce a message to the specified topic with the given key and value.
- consume_messages(): Consume messages from the subscribed topics and print them.
- close(): Close the Kafka producer and consumer instances.
"""
def __init__(self, bootstrap_servers, group_id, auto_offset_reset='earliest'):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.auto_offset_reset = auto_offset_reset
self.producer = None
self.consumer = None
def create_producer(self):
"""
Create a Kafka producer instance.
Returns:
None
"""
producer_config = {
'bootstrap.servers': self.bootstrap_servers,
'client.id': 'python-producer'
}
self.producer = Producer(producer_config)
def create_consumer(self, topics):
"""
Create a Kafka consumer instance and subscribe to the specified topics.
Parameters:
- topics (list): A list of topics to subscribe to.
Returns:
None
"""
consumer_config = {
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id,
'auto.offset.reset': self.auto_offset_reset
}
self.consumer = Consumer(consumer_config)
self.consumer.subscribe(topics)
def produce_message(self, topic, key, value):
"""
Produce a message to the specified topic with the given key and value.
Parameters:
- topic (str): The Kafka topic to produce the message to.
- key: The key for the message.
- value: The value of the message.
Returns:
None
"""
if self.producer:
self.producer.produce(topic, key=key, value=value)
self.producer.flush()
def consume_messages(self):
"""
Consume messages from the subscribed topics and print them.
Returns:
None
"""
if self.consumer:
while True:
msg = self.consumer.poll(1.0) # Poll for messages with a timeout
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition")
else:
print("Error while consuming: {}".format(msg.error()))
else:
print('Received message: key={}, value={}'.format(msg.key(), msg.value()))
def close(self):
"""
Close the Kafka producer and consumer instances.
Returns:
None
"""
if self.producer:
self.producer.close()
if self.consumer:
self.consumer.close()
if __name__ == '__main__':
# Define Kafka broker address and topic
bootstrap_servers = 'your_kafka_broker_address:9092'
topic_name = 'your_topic_name'
# Create a KafkaClient instance
kafka_client = KafkaClient(bootstrap_servers, 'python-consumer-group')
# Create a Kafka producer and send a message
kafka_client.create_producer()
kafka_client.produce_message(topic_name, 'key1', 'Hello, Kafka!')
# Create a Kafka consumer and consume messages
kafka_client.create_consumer([topic_name])
kafka_client.consume_messages()
# Close the Kafka client
kafka_client.close()