-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbasic_example.py
158 lines (114 loc) · 4.33 KB
/
basic_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# type: ignore
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
AddressHelper,
AMQPMessagingHandler,
BindingSpecification,
Connection,
Disposition,
Event,
ExchangeSpecification,
Message,
QuorumQueueSpecification,
)
MESSAGES_TO_PUBLISH = 100
class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
self._count = 0
def on_message(self, event: Event):
print("received message: " + str(event.message.body))
# accepting
self.delivery_context.accept(event)
# in case of rejection (+eventually deadlettering)
# self.delivery_context.discard(event)
# in case of requeuing
# self.delivery_context.requeue(event)
# annotations = {}
# annotations[symbol('x-opt-string')] = 'x-test1'
# in case of requeuing with annotations added
# self.delivery_context.requeue_with_annotations(event, annotations)
# in case of rejection with annotations added
# self.delivery_context.discard_with_annotations(event)
print("count " + str(self._count))
self._count = self._count + 1
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
# event.connection.close()
def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")
def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")
def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()
return connection
def main() -> None:
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
print("connection to amqp server")
connection = create_connection()
management = connection.management()
print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
management.declare_queue(
QuorumQueueSpecification(name=queue_name)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)
print("binding queue to exchange")
bind_name = management.bind(
BindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
)
)
addr = AddressHelper.exchange_address(exchange_name, routing_key)
addr_queue = AddressHelper.queue_address(queue_name)
print("create a publisher and publish a test message")
publisher = connection.publisher(addr)
print("purging the queue")
messages_purged = management.purge_queue(queue_name)
print("messages purged: " + str(messages_purged))
# management.close()
# publish 10 messages
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
if status.remote_state == Disposition.ACCEPTED:
print("message accepted")
elif status.remote_state == Disposition.RELEASED:
print("message not routed")
elif status.remote_state == Disposition.REJECTED:
print("message not rejected")
publisher.close()
print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())
try:
consumer.run()
except KeyboardInterrupt:
pass
print("cleanup")
consumer.close()
# once we finish consuming if we close the connection we need to create a new one
# connection = create_connection()
# management = connection.management()
print("unbind")
management.unbind(bind_name)
print("delete queue")
management.delete_queue(queue_name)
print("delete exchange")
management.delete_exchange(exchange_name)
print("closing connections")
management.close()
print("after management closing")
connection.close()
print("after connection closing")
if __name__ == "__main__":
main()