From 11323fe663c057ab3fe1ed3b016f233ef992befd Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 21 May 2025 11:30:24 +0200 Subject: [PATCH 1/3] set message durable as default releaded to https://github.com/rabbitmq/rabbitmq-server/pull/13918 Signed-off-by: Gabriele Santomaggio --- .../qpid/proton/_message.py | 46 +++++++++-------- tests/test_publisher.py | 51 +++++++++++++++---- 2 files changed, 65 insertions(+), 32 deletions(-) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 7968447..63d524b 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -110,7 +110,7 @@ class Message(object): """ Default AMQP message priority""" def __init__( - self, body: Union[bytes, None] = None, inferred=True, **kwargs + self, body: Union[bytes, None] = None, inferred=True, durable=True, **kwargs ) -> None: # validate the types @@ -120,6 +120,7 @@ def __init__( self.properties = None self.body = body self.inferred = inferred + self.durable = durable for k, v in kwargs.items(): getattr(self, k) # Raise exception if it's not a valid attribute. @@ -504,7 +505,7 @@ def instructions(self) -> Optional[AnnotationDict]: @instructions.setter def instructions( - self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(instructions, dict): self.instruction_dict = AnnotationDict(instructions, raise_on_error=False) @@ -527,7 +528,7 @@ def annotations(self) -> Optional[AnnotationDict]: @annotations.setter def annotations( - self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(annotations, dict): self.annotation_dict = AnnotationDict(annotations, raise_on_error=False) @@ -594,7 +595,8 @@ def send(self, sender: "Sender", tag: Optional[str] = None) -> "Delivery": return dlv @overload - def recv(self, link: "Sender") -> None: ... + def recv(self, link: "Sender") -> None: + ... def recv(self, link: "Receiver") -> Optional["Delivery"]: """ @@ -625,24 +627,24 @@ def recv(self, link: "Receiver") -> Optional["Delivery"]: def __repr__(self) -> str: props = [] for attr in ( - "inferred", - "address", - "reply_to", - "durable", - "ttl", - "priority", - "first_acquirer", - "delivery_count", - "id", - "correlation_id", - "user_id", - "group_id", - "group_sequence", - "reply_to_group_id", - "instructions", - "annotations", - "properties", - "body", + "inferred", + "address", + "reply_to", + "durable", + "ttl", + "priority", + "first_acquirer", + "delivery_count", + "id", + "correlation_id", + "user_id", + "group_id", + "group_sequence", + "reply_to_group_id", + "instructions", + "annotations", + "properties", + "body", ): value = getattr(self, attr) if value: diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 0847717..1cc856a 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -45,7 +45,6 @@ def test_validate_message_for_publishing(connection: Connection) -> None: def test_publish_queue(connection: Connection) -> None: - queue_name = "test-queue" management = connection.management() @@ -77,7 +76,6 @@ def test_publish_queue(connection: Connection) -> None: def test_publish_per_message(connection: Connection) -> None: - queue_name = "test-queue-1" queue_name_2 = "test-queue-2" management = connection.management() @@ -123,7 +121,6 @@ def test_publish_per_message(connection: Connection) -> None: def test_publish_ssl(connection_ssl: Connection) -> None: - queue_name = "test-queue" management = connection_ssl.management() @@ -148,7 +145,6 @@ def test_publish_ssl(connection_ssl: Connection) -> None: def test_publish_to_invalid_destination(connection: Connection) -> None: - queue_name = "test-queue" raised = False @@ -169,7 +165,6 @@ def test_publish_to_invalid_destination(connection: Connection) -> None: def test_publish_per_message_to_invalid_destination(connection: Connection) -> None: - queue_name = "test-queue-1" raised = False @@ -193,7 +188,6 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N def test_publish_per_message_both_address(connection: Connection) -> None: - queue_name = "test-queue-1" raised = False @@ -223,7 +217,6 @@ def test_publish_per_message_both_address(connection: Connection) -> None: def test_publish_exchange(connection: Connection) -> None: - exchange_name = "test-exchange" queue_name = "test-queue" management = connection.management() @@ -342,7 +335,6 @@ def test_disconnection_reconnection() -> None: def test_queue_info_for_stream_with_validations(connection: Connection) -> None: - stream_name = "test_stream_info_with_validation" messages_to_send = 200 @@ -361,7 +353,6 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: def test_publish_per_message_exchange(connection: Connection) -> None: - exchange_name = "test-exchange-per-message" queue_name = "test-queue-per-message" management = connection.management() @@ -407,7 +398,6 @@ def test_publish_per_message_exchange(connection: Connection) -> None: def test_multiple_publishers(environment: Environment) -> None: - stream_name = "test_multiple_publisher_1" stream_name_2 = "test_multiple_publisher_2" connection = environment.connection() @@ -456,3 +446,44 @@ def test_multiple_publishers(environment: Environment) -> None: management.delete_queue(stream_name_2) management.close() + + +def test_durable_message(connection: Connection) -> None: + queue_name = "test_durable_message" + + management = connection.management() + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + destination = AddressHelper.queue_address(queue_name) + publisher = connection.publisher(destination) + # message should be durable by default + status = publisher.publish( + Message( + body=Converter.string_to_bytes("test"), + ) + ) + + assert status.remote_state == OutcomeState.ACCEPTED + # message should be not durable by setting the durable to False by the user + status = publisher.publish( + Message( + body=Converter.string_to_bytes("test"), + durable=False, + ) + ) + + assert status.remote_state == OutcomeState.ACCEPTED + + consumer = connection.consumer(destination) + should_be_durable = consumer.consume() + assert should_be_durable.durable is True + + should_be_not_durable = consumer.consume() + assert should_be_not_durable.durable is False + + consumer.close() + + management.delete_queue(queue_name) + + management.close() + + pass From 5f405ddddc96f76061835c2b7ab91f825b1384a1 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 26 May 2025 09:45:51 +0200 Subject: [PATCH 2/3] set message durable as default releaded to https://github.com/rabbitmq/rabbitmq-server/pull/13918 Signed-off-by: Gabriele Santomaggio --- .ci/ubuntu/rabbitmq.conf | 2 +- rabbitmq_amqp_python_client/management.py | 8 ++-- .../qpid/proton/_message.py | 45 +++++++++---------- tests/test_publisher.py | 12 ++--- 4 files changed, 32 insertions(+), 35 deletions(-) diff --git a/.ci/ubuntu/rabbitmq.conf b/.ci/ubuntu/rabbitmq.conf index 89b64f4..5ffa593 100644 --- a/.ci/ubuntu/rabbitmq.conf +++ b/.ci/ubuntu/rabbitmq.conf @@ -11,7 +11,7 @@ listeners.tcp.default = 5672 listeners.ssl.default = 5671 reverse_dns_lookups = false -deprecated_features.permit.amqp_address_v1 = false +# deprecated_features.permit.amqp_address_v1 = false ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem ssl_options.certfile = /etc/rabbitmq/certs/server_localhost_certificate.pem diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 6ff7487..a355185 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -125,14 +125,14 @@ def request( def _request( self, - id: str, + msg_id: str, body: Any, path: str, method: str, expected_response_codes: list[int], ) -> Message: amq_message = Message( - id=id, + id=msg_id, body=body, inferred=False, reply_to="$me", @@ -170,9 +170,7 @@ def declare_exchange( ValidationCodeException: If exchange already exists or other validation fails """ logger.debug("declare_exchange operation called") - body: dict[str, Any] = {} - body["auto_delete"] = exchange_specification.is_auto_delete - body["durable"] = exchange_specification.is_durable + body: dict[str, Any] = {"durable": exchange_specification.is_durable} if isinstance(exchange_specification, ExchangeSpecification): body["type"] = exchange_specification.exchange_type.value elif isinstance(exchange_specification, ExchangeCustomSpecification): diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 63d524b..05c1aea 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -110,7 +110,7 @@ class Message(object): """ Default AMQP message priority""" def __init__( - self, body: Union[bytes, None] = None, inferred=True, durable=True, **kwargs + self, body: Union[bytes, None] = None, inferred=True, durable=True, **kwargs ) -> None: # validate the types @@ -505,7 +505,7 @@ def instructions(self) -> Optional[AnnotationDict]: @instructions.setter def instructions( - self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(instructions, dict): self.instruction_dict = AnnotationDict(instructions, raise_on_error=False) @@ -528,7 +528,7 @@ def annotations(self) -> Optional[AnnotationDict]: @annotations.setter def annotations( - self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(annotations, dict): self.annotation_dict = AnnotationDict(annotations, raise_on_error=False) @@ -595,8 +595,7 @@ def send(self, sender: "Sender", tag: Optional[str] = None) -> "Delivery": return dlv @overload - def recv(self, link: "Sender") -> None: - ... + def recv(self, link: "Sender") -> None: ... def recv(self, link: "Receiver") -> Optional["Delivery"]: """ @@ -627,24 +626,24 @@ def recv(self, link: "Receiver") -> Optional["Delivery"]: def __repr__(self) -> str: props = [] for attr in ( - "inferred", - "address", - "reply_to", - "durable", - "ttl", - "priority", - "first_acquirer", - "delivery_count", - "id", - "correlation_id", - "user_id", - "group_id", - "group_sequence", - "reply_to_group_id", - "instructions", - "annotations", - "properties", - "body", + "inferred", + "address", + "reply_to", + "durable", + "ttl", + "priority", + "first_acquirer", + "delivery_count", + "id", + "correlation_id", + "user_id", + "group_id", + "group_sequence", + "reply_to_group_id", + "instructions", + "annotations", + "properties", + "body", ): value = getattr(self, attr) if value: diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 1cc856a..0b4ee4a 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -458,7 +458,7 @@ def test_durable_message(connection: Connection) -> None: # message should be durable by default status = publisher.publish( Message( - body=Converter.string_to_bytes("test"), + body=Converter.string_to_bytes("durable"), ) ) @@ -466,7 +466,7 @@ def test_durable_message(connection: Connection) -> None: # message should be not durable by setting the durable to False by the user status = publisher.publish( Message( - body=Converter.string_to_bytes("test"), + body=Converter.string_to_bytes("not durable"), durable=False, ) ) @@ -474,11 +474,11 @@ def test_durable_message(connection: Connection) -> None: assert status.remote_state == OutcomeState.ACCEPTED consumer = connection.consumer(destination) - should_be_durable = consumer.consume() - assert should_be_durable.durable is True + # should_be_durable = consumer.consume() + # assert should_be_durable.durable is True - should_be_not_durable = consumer.consume() - assert should_be_not_durable.durable is False + # should_be_not_durable = consumer.consume() + # assert should_be_not_durable.durable is False consumer.close() From 3fdd95927ec564058e5cf2a774bdb3c84eb79966 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 26 May 2025 10:35:20 +0200 Subject: [PATCH 3/3] set message durable as default releaded to https://github.com/rabbitmq/rabbitmq-server/pull/13918 Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/management.py | 1 + tests/test_publisher.py | 18 +++++++----------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index a355185..7a64f2f 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -138,6 +138,7 @@ def _request( reply_to="$me", address=path, subject=method, + durable=False, ) if self._sender is not None: diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 0b4ee4a..02e337f 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -474,16 +474,12 @@ def test_durable_message(connection: Connection) -> None: assert status.remote_state == OutcomeState.ACCEPTED consumer = connection.consumer(destination) - # should_be_durable = consumer.consume() - # assert should_be_durable.durable is True - - # should_be_not_durable = consumer.consume() - # assert should_be_not_durable.durable is False - - consumer.close() + should_be_durable = consumer.consume() + assert should_be_durable.durable is True + should_be_not_durable = consumer.consume() + assert should_be_not_durable.durable is False + message_count = management.purge_queue(queue_name) + assert message_count == 0 management.delete_queue(queue_name) - - management.close() - - pass + consumer.close()