From 07a26170119df143e8bfdd058607509736d6125a Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Thu, 6 Feb 2025 10:33:57 +0100 Subject: [PATCH] improve examples / updating docs / adding credits to consumer --- README.md | 29 ++- examples/getting_started/basic_example.py | 25 +-- .../getting_started/example_with_streams.py | 36 ++-- .../getting_started/reconnection_example.py | 6 +- examples/getting_started/tls_example.py | 168 ++++++++++++++++++ rabbitmq_amqp_python_client/__init__.py | 3 +- rabbitmq_amqp_python_client/connection.py | 5 +- rabbitmq_amqp_python_client/consumer.py | 5 + 8 files changed, 239 insertions(+), 38 deletions(-) create mode 100644 examples/getting_started/tls_example.py diff --git a/README.md b/README.md index 85313c5..eeb3d05 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt ## Getting Started -An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with: +An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with: poetry run python ./examples/getting_started/main.py @@ -109,6 +109,33 @@ Then from connection get a consumer object: The consumer will run indefinitively waiting for messages to arrive. +### Support for streams + +The client natively supports streams: https://www.rabbitmq.com/blog/2021/07/13/rabbitmq-streams-overview + +You can consume from a given offset or specify a default starting point (FIRST, NEXT, LAST). + +Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering + +You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams. + +### SSL + +The client supports TLS/SSL connections. + +You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection + + +### Managing disconnection + +At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected. +You can use this callback to implement your own logic and eventually attempt a reconnection. + +You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and +eventually attempt a reconnection + + + diff --git a/examples/getting_started/basic_example.py b/examples/getting_started/basic_example.py index baa253d..be816f4 100644 --- a/examples/getting_started/basic_example.py +++ b/examples/getting_started/basic_example.py @@ -6,13 +6,14 @@ AMQPMessagingHandler, BindingSpecification, Connection, + Disposition, Event, ExchangeSpecification, Message, QuorumQueueSpecification, ) -messages_to_publish = 100 +MESSAGES_TO_PUBLISH = 100 class MyMessageHandler(AMQPMessagingHandler): @@ -45,7 +46,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == messages_to_publish: + if self._count == MESSAGES_TO_PUBLISH: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -62,17 +63,6 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: connection = Connection("amqp://guest:guest@localhost:5672/") - # in case of SSL enablement - # ca_cert_file = ".ci/certs/ca_certificate.pem" - # client_cert = ".ci/certs/client_certificate.pem" - # client_key = ".ci/certs/client_key.pem" - # connection = Connection( - # "amqps://guest:guest@localhost:5671/", - # ssl_context=SslConfigurationContext( - # ca_cert=ca_cert_file, - # client_cert=ClientCert(client_cert=client_cert, client_key=client_key), - # ), - # ) connection.dial() return connection @@ -120,13 +110,14 @@ def main() -> None: # management.close() # publish 10 messages - for i in range(messages_to_publish): + for i in range(MESSAGES_TO_PUBLISH): + print("publishing") status = publisher.publish(Message(body="test")) - if status.ACCEPTED: + if status.remote_state == Disposition.ACCEPTED: print("message accepted") - elif status.RELEASED: + elif status.remote_state == Disposition.RELEASED: print("message not routed") - elif status.REJECTED: + elif status.remote_state == Disposition.REJECTED: print("message not rejected") publisher.close() diff --git a/examples/getting_started/example_with_streams.py b/examples/getting_started/example_with_streams.py index 0ff9df0..b4b55c0 100644 --- a/examples/getting_started/example_with_streams.py +++ b/examples/getting_started/example_with_streams.py @@ -11,6 +11,8 @@ StreamSpecification, ) +MESSAGES_TO_PUBLISH = 100 + class MyMessageHandler(AMQPMessagingHandler): @@ -19,6 +21,7 @@ def __init__(self): self._count = 0 def on_message(self, event: Event): + # just messages with banana filters get received print( "received message from stream: " + str(event.message.body) @@ -47,7 +50,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == 100: + if self._count == MESSAGES_TO_PUBLISH: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -64,17 +67,6 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: connection = Connection("amqp://guest:guest@localhost:5672/") - # in case of SSL enablement - # ca_cert_file = ".ci/certs/ca_certificate.pem" - # client_cert = ".ci/certs/client_certificate.pem" - # client_key = ".ci/certs/client_key.pem" - # connection = Connection( - # "amqps://guest:guest@localhost:5671/", - # ssl_context=SslConfigurationContext( - # ca_cert=ca_cert_file, - # client_cert=ClientCert(client_cert=client_cert, client_key=client_key), - # ), - # ) connection.dial() return connection @@ -82,7 +74,6 @@ def create_connection() -> Connection: def main() -> None: queue_name = "example-queue" - messages_to_publish = 100 print("connection to amqp server") connection = create_connection() @@ -99,6 +90,7 @@ def main() -> None: # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered stream_filter_options.offset(OffsetSpecification.first) + stream_filter_options.apply_filters(["banana"]) consumer = consumer_connection.consumer( addr_queue, @@ -112,8 +104,22 @@ def main() -> None: # print("create a publisher and publish a test message") publisher = connection.publisher(addr_queue) - for i in range(messages_to_publish): - publisher.publish(Message(body="test: " + str(i))) + # publish with a filter of apple + for i in range(MESSAGES_TO_PUBLISH): + publisher.publish( + Message( + body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"} + ) + ) + + # publish with a filter of banana + for i in range(MESSAGES_TO_PUBLISH): + publisher.publish( + Message( + body="banana: " + str(i), + annotations={"x-stream-filter-value": "banana"}, + ) + ) publisher.close() diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index 0003eb5..8db4201 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -31,7 +31,7 @@ class ConnectionConfiguration: connection_configuration = ConnectionConfiguration() -messages_to_publish = 50000 +MESSAGES_TO_PUBLSH = 50000 # disconnection callback @@ -95,7 +95,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == messages_to_publish: + if self._count == MESSAGES_TO_PUBLSH: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -181,7 +181,7 @@ def main() -> None: # publishing messages while True: - for i in range(messages_to_publish): + for i in range(MESSAGES_TO_PUBLSH): if i % 1000 == 0: print("published 1000 messages...") diff --git a/examples/getting_started/tls_example.py b/examples/getting_started/tls_example.py new file mode 100644 index 0000000..f159e75 --- /dev/null +++ b/examples/getting_started/tls_example.py @@ -0,0 +1,168 @@ +# type: ignore + + +from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, + AddressHelper, + AMQPMessagingHandler, + BindingSpecification, + ClientCert, + Connection, + Event, + ExchangeSpecification, + Message, + QuorumQueueSpecification, + SslConfigurationContext, +) + +messages_to_publish = 100 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_message(self, event: Event): + print("received message: " + str(event.message.body)) + + # accepting + self.delivery_context.accept(event) + + # in case of rejection (+eventually deadlettering) + # self.delivery_context.discard(event) + + # in case of requeuing + # self.delivery_context.requeue(event) + + # annotations = {} + # annotations[symbol('x-opt-string')] = 'x-test1' + # in case of requeuing with annotations added + # self.delivery_context.requeue_with_annotations(event, annotations) + + # in case of rejection with annotations added + # self.delivery_context.discard_with_annotations(event) + + print("count " + str(self._count)) + + self._count = self._count + 1 + + if self._count == messages_to_publish: + print("closing receiver") + # if you want you can add cleanup operations here + # event.receiver.close() + # event.connection.close() + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection() -> Connection: + # in case of SSL enablement + ca_cert_file = ".ci/certs/ca_certificate.pem" + client_cert = ".ci/certs/client_certificate.pem" + client_key = ".ci/certs/client_key.pem" + connection = Connection( + "amqps://guest:guest@localhost:5671/", + ssl_context=SslConfigurationContext( + ca_cert=ca_cert_file, + client_cert=ClientCert(client_cert=client_cert, client_key=client_key), + ), + ) + connection.dial() + + return connection + + +def main() -> None: + + exchange_name = "test-exchange" + queue_name = "example-queue" + routing_key = "routing-key" + + print("connection to amqp server") + connection = create_connection() + + management = connection.management() + + print("declaring exchange and queue") + management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + + management.declare_queue( + QuorumQueueSpecification(name=queue_name) + # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") + ) + + print("binding queue to exchange") + bind_name = management.bind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + binding_key=routing_key, + ) + ) + + addr = AddressHelper.exchange_address(exchange_name, routing_key) + + addr_queue = AddressHelper.queue_address(queue_name) + + print("create a publisher and publish a test message") + publisher = connection.publisher(addr) + + print("purging the queue") + messages_purged = management.purge_queue(queue_name) + + print("messages purged: " + str(messages_purged)) + # management.close() + + # publish 10 messages + for i in range(messages_to_publish): + status = publisher.publish(Message(body="test")) + if status.ACCEPTED: + print("message accepted") + elif status.RELEASED: + print("message not routed") + elif status.REJECTED: + print("message not rejected") + + publisher.close() + + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + + try: + consumer.run() + except KeyboardInterrupt: + pass + + print("cleanup") + consumer.close() + # once we finish consuming if we close the connection we need to create a new one + # connection = create_connection() + # management = connection.management() + + print("unbind") + management.unbind(bind_name) + + print("delete queue") + management.delete_queue(queue_name) + + print("delete exchange") + management.delete_exchange(exchange_name) + + print("closing connections") + management.close() + print("after management closing") + connection.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 1a3e997..3e8f86c 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -15,7 +15,7 @@ from .management import Management from .publisher import Publisher from .qpid.proton._data import symbol # noqa: E402 -from .qpid.proton._delivery import Delivery +from .qpid.proton._delivery import Delivery, Disposition from .qpid.proton._events import Event from .qpid.proton._message import Message from .qpid.proton._utils import ConnectionClosed @@ -65,4 +65,5 @@ "ConnectionClosed", "StreamOptions", "OffsetSpecification", + "Disposition", ] diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index c8480b7..dac9a14 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -89,10 +89,13 @@ def consumer( destination: str, handler: Optional[MessagingHandler] = None, stream_filter_options: Optional[StreamOptions] = None, + credit: Optional[int] = None, ) -> Consumer: if validate_address(destination) is False: raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) - consumer = Consumer(self._conn, destination, handler, stream_filter_options) + consumer = Consumer( + self._conn, destination, handler, stream_filter_options, credit + ) return consumer diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 729f247..87a6996 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -23,12 +23,14 @@ def __init__( addr: str, handler: Optional[MessagingHandler] = None, stream_options: Optional[StreamOptions] = None, + credit: Optional[int] = None, ): self._receiver: Optional[BlockingReceiver] = None self._conn = conn self._addr = addr self._handler = handler self._stream_options = stream_options + self._credit = credit self._open() def _open(self) -> None: @@ -70,4 +72,7 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: handler=self._handler, ) + if self._credit is not None: + receiver.credit = self._credit + return receiver