Skip to content

Commit

Permalink
improve examples / updating docs / adding credits to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Feb 6, 2025
1 parent d08139f commit 07a2617
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 38 deletions.
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt

## Getting Started

An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with:

poetry run python ./examples/getting_started/main.py

Expand Down Expand Up @@ -109,6 +109,33 @@ Then from connection get a consumer object:

The consumer will run indefinitively waiting for messages to arrive.

### Support for streams

The client natively supports streams: https://www.rabbitmq.com/blog/2021/07/13/rabbitmq-streams-overview

You can consume from a given offset or specify a default starting point (FIRST, NEXT, LAST).

Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering

You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams.

### SSL

The client supports TLS/SSL connections.

You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection


### Managing disconnection

At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
You can use this callback to implement your own logic and eventually attempt a reconnection.

You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and
eventually attempt a reconnection






25 changes: 8 additions & 17 deletions examples/getting_started/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
AMQPMessagingHandler,
BindingSpecification,
Connection,
Disposition,
Event,
ExchangeSpecification,
Message,
QuorumQueueSpecification,
)

messages_to_publish = 100
MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):
Expand Down Expand Up @@ -45,7 +46,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == messages_to_publish:
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
Expand All @@ -62,17 +63,6 @@ def on_link_closed(self, event: Event) -> None:

def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand Down Expand Up @@ -120,13 +110,14 @@ def main() -> None:
# management.close()

# publish 10 messages
for i in range(messages_to_publish):
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
if status.ACCEPTED:
if status.remote_state == Disposition.ACCEPTED:
print("message accepted")
elif status.RELEASED:
elif status.remote_state == Disposition.RELEASED:
print("message not routed")
elif status.REJECTED:
elif status.remote_state == Disposition.REJECTED:
print("message not rejected")

publisher.close()
Expand Down
36 changes: 21 additions & 15 deletions examples/getting_started/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):

Expand All @@ -19,6 +21,7 @@ def __init__(self):
self._count = 0

def on_message(self, event: Event):
# just messages with banana filters get received
print(
"received message from stream: "
+ str(event.message.body)
Expand Down Expand Up @@ -47,7 +50,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == 100:
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
Expand All @@ -64,25 +67,13 @@ def on_link_closed(self, event: Event) -> None:

def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection


def main() -> None:
queue_name = "example-queue"
messages_to_publish = 100

print("connection to amqp server")
connection = create_connection()
Expand All @@ -99,6 +90,7 @@ def main() -> None:
# can be first, last, next or an offset long
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
stream_filter_options.offset(OffsetSpecification.first)
stream_filter_options.apply_filters(["banana"])

consumer = consumer_connection.consumer(
addr_queue,
Expand All @@ -112,8 +104,22 @@ def main() -> None:
# print("create a publisher and publish a test message")
publisher = connection.publisher(addr_queue)

for i in range(messages_to_publish):
publisher.publish(Message(body="test: " + str(i)))
# publish with a filter of apple
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(
Message(
body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"}
)
)

# publish with a filter of banana
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(
Message(
body="banana: " + str(i),
annotations={"x-stream-filter-value": "banana"},
)
)

publisher.close()

Expand Down
6 changes: 3 additions & 3 deletions examples/getting_started/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ConnectionConfiguration:


connection_configuration = ConnectionConfiguration()
messages_to_publish = 50000
MESSAGES_TO_PUBLSH = 50000


# disconnection callback
Expand Down Expand Up @@ -95,7 +95,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == messages_to_publish:
if self._count == MESSAGES_TO_PUBLSH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
Expand Down Expand Up @@ -181,7 +181,7 @@ def main() -> None:

# publishing messages
while True:
for i in range(messages_to_publish):
for i in range(MESSAGES_TO_PUBLSH):

if i % 1000 == 0:
print("published 1000 messages...")
Expand Down
168 changes: 168 additions & 0 deletions examples/getting_started/tls_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# type: ignore


from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
AddressHelper,
AMQPMessagingHandler,
BindingSpecification,
ClientCert,
Connection,
Event,
ExchangeSpecification,
Message,
QuorumQueueSpecification,
SslConfigurationContext,
)

messages_to_publish = 100


class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_message(self, event: Event):
print("received message: " + str(event.message.body))

# accepting
self.delivery_context.accept(event)

# in case of rejection (+eventually deadlettering)
# self.delivery_context.discard(event)

# in case of requeuing
# self.delivery_context.requeue(event)

# annotations = {}
# annotations[symbol('x-opt-string')] = 'x-test1'
# in case of requeuing with annotations added
# self.delivery_context.requeue_with_annotations(event, annotations)

# in case of rejection with annotations added
# self.delivery_context.discard_with_annotations(event)

print("count " + str(self._count))

self._count = self._count + 1

if self._count == messages_to_publish:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
# event.connection.close()

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")

def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")


def create_connection() -> Connection:
# in case of SSL enablement
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"
connection = Connection(
"amqps://guest:guest@localhost:5671/",
ssl_context=SslConfigurationContext(
ca_cert=ca_cert_file,
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
),
)
connection.dial()

return connection


def main() -> None:

exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"

print("connection to amqp server")
connection = create_connection()

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)

print("binding queue to exchange")
bind_name = management.bind(
BindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
)
)

addr = AddressHelper.exchange_address(exchange_name, routing_key)

addr_queue = AddressHelper.queue_address(queue_name)

print("create a publisher and publish a test message")
publisher = connection.publisher(addr)

print("purging the queue")
messages_purged = management.purge_queue(queue_name)

print("messages purged: " + str(messages_purged))
# management.close()

# publish 10 messages
for i in range(messages_to_publish):
status = publisher.publish(Message(body="test"))
if status.ACCEPTED:
print("message accepted")
elif status.RELEASED:
print("message not routed")
elif status.REJECTED:
print("message not rejected")

publisher.close()

print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())

try:
consumer.run()
except KeyboardInterrupt:
pass

print("cleanup")
consumer.close()
# once we finish consuming if we close the connection we need to create a new one
# connection = create_connection()
# management = connection.management()

print("unbind")
management.unbind(bind_name)

print("delete queue")
management.delete_queue(queue_name)

print("delete exchange")
management.delete_exchange(exchange_name)

print("closing connections")
management.close()
print("after management closing")
connection.close()
print("after connection closing")


if __name__ == "__main__":
main()
Loading

0 comments on commit 07a2617

Please sign in to comment.