-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreceiver.py
36 lines (28 loc) · 1.16 KB
/
receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import pika
from threading import Thread
class ReceiverBroker():
def __init__(self, user=None, handler=None):
self.user = user
self.handler = handler
def connect(self, exchange=None):
self.exchange = exchange
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange=exchange, exchange_type='fanout')
result = self.channel.queue_declare(queue='', exclusive=True)
self.queue_name = result.method.queue
self.channel.queue_bind(exchange=exchange, queue=self.queue_name)
print(' [*] Waiting for msgs. To exit press CTRL+C')
def listen_channel(self, cb):
self.channel.basic_consume(
queue=self.queue_name, on_message_callback=cb, auto_ack=True)
self.channel.start_consuming()
print("shutdown broker!")
def async_consumer(self, cb):
worker = Thread(target=self.listen_channel, args=[cb])
worker.start()
def discard_channel(self):
self.channel.stop_consuming()
# self.connection.close()