Skip to content

Commit 4146b5f

Browse files
Implement filters (#77)
* Implement filters based on message properties. part of: #42 --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 36bb249 commit 4146b5f

File tree

6 files changed

+312
-15
lines changed

6 files changed

+312
-15
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ format:
1212
poetry run black rabbitmq_amqp_python_client/
1313
poetry run black tests/
1414
poetry run flake8 --exclude=venv,.venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
15+
poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .
1516

1617
test: format
1718
poetry run pytest .

examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ Client examples
44
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
55
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
66
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
7-
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
7+
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
8+
- [Streams with filters](./streams_with_filters/example_streams_with_filters.py) - Example supporting stream capabilities with filters
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# type: ignore
2+
import logging
3+
import time
4+
5+
from rabbitmq_amqp_python_client import (
6+
AddressHelper,
7+
AMQPMessagingHandler,
8+
Connection,
9+
ConnectionClosed,
10+
Converter,
11+
Environment,
12+
Event,
13+
Message,
14+
MessageProperties,
15+
OffsetSpecification,
16+
StreamConsumerOptions,
17+
StreamFilterOptions,
18+
StreamSpecification,
19+
)
20+
21+
MESSAGES_TO_PUBLISH = 100
22+
23+
24+
class MyMessageHandler(AMQPMessagingHandler):
25+
26+
def __init__(self):
27+
super().__init__()
28+
self._count = 0
29+
30+
def on_amqp_message(self, event: Event):
31+
# only messages with banana filters and with subject yellow
32+
self._count = self._count + 1
33+
logger.info(
34+
"Received message: {}, subject {}.[Total Consumed: {}]".format(
35+
Converter.bytes_to_string(event.message.body),
36+
event.message.subject,
37+
self._count,
38+
)
39+
)
40+
self.delivery_context.accept(event)
41+
42+
def on_connection_closed(self, event: Event):
43+
# if you want you can add cleanup operations here
44+
print("connection closed")
45+
46+
def on_link_closed(self, event: Event) -> None:
47+
# if you want you can add cleanup operations here
48+
print("link closed")
49+
50+
51+
def create_connection(environment: Environment) -> Connection:
52+
connection = environment.connection()
53+
connection.dial()
54+
55+
return connection
56+
57+
58+
logging.basicConfig()
59+
logger = logging.getLogger("[streams_with_filters]")
60+
logger.setLevel(logging.INFO)
61+
62+
63+
def main() -> None:
64+
"""
65+
In this example we create a stream queue and a consumer with filtering options.
66+
The example combines two filters:
67+
- filter value: banana
68+
- subject: yellow
69+
70+
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
71+
"""
72+
73+
queue_name = "stream-example-with_filtering-queue"
74+
logger.info("Creating connection")
75+
environment = Environment("amqp://guest:guest@localhost:5672/")
76+
connection = create_connection(environment)
77+
management = connection.management()
78+
# delete the queue if it exists
79+
management.delete_queue(queue_name)
80+
# create a stream queue
81+
management.declare_queue(StreamSpecification(name=queue_name))
82+
83+
addr_queue = AddressHelper.queue_address(queue_name)
84+
85+
consumer_connection = create_connection(environment)
86+
87+
consumer = consumer_connection.consumer(
88+
addr_queue,
89+
message_handler=MyMessageHandler(),
90+
# the consumer will only receive messages with filter value banana and subject yellow
91+
stream_consumer_options=StreamConsumerOptions(
92+
offset_specification=OffsetSpecification.first,
93+
filter_options=StreamFilterOptions(
94+
values=["banana"],
95+
message_properties=MessageProperties(
96+
subject="yellow",
97+
),
98+
),
99+
),
100+
)
101+
print(
102+
"create a consumer and consume the test message - press control + c to terminate to consume"
103+
)
104+
105+
# print("create a publisher and publish a test message")
106+
publisher = connection.publisher(addr_queue)
107+
108+
# publish with a filter of apple
109+
for i in range(MESSAGES_TO_PUBLISH):
110+
color = "green" if i % 2 == 0 else "yellow"
111+
publisher.publish(
112+
Message(
113+
Converter.string_to_bytes(body="apple: " + str(i)),
114+
annotations={"x-stream-filter-value": "apple"},
115+
subject=color,
116+
)
117+
)
118+
119+
time.sleep(0.5) # wait a bit to ensure messages are published in different chunks
120+
121+
# publish with a filter of banana
122+
for i in range(MESSAGES_TO_PUBLISH):
123+
color = "green" if i % 2 == 0 else "yellow"
124+
publisher.publish(
125+
Message(
126+
body=Converter.string_to_bytes("banana: " + str(i)),
127+
annotations={"x-stream-filter-value": "banana"},
128+
subject=color,
129+
)
130+
)
131+
132+
publisher.close()
133+
134+
while True:
135+
try:
136+
consumer.run()
137+
except KeyboardInterrupt:
138+
pass
139+
except ConnectionClosed:
140+
print("connection closed")
141+
continue
142+
except Exception as e:
143+
print("consumer exited for exception " + str(e))
144+
145+
break
146+
147+
#
148+
logger.info("consumer exited, deleting queue")
149+
management.delete_queue(queue_name)
150+
151+
print("closing connections")
152+
management.close()
153+
print("after management closing")
154+
environment.close()
155+
print("after connection closing")
156+
157+
158+
if __name__ == "__main__":
159+
main()

rabbitmq_amqp_python_client/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
ExchangeSpecification,
1111
ExchangeToExchangeBindingSpecification,
1212
ExchangeToQueueBindingSpecification,
13+
MessageProperties,
1314
OAuth2Options,
1415
OffsetSpecification,
1516
RecoveryConfiguration,
1617
StreamConsumerOptions,
18+
StreamFilterOptions,
1719
)
1820
from .environment import Environment
1921
from .exceptions import (
@@ -86,6 +88,8 @@
8688
"PKCS12Store",
8789
"ConnectionClosed",
8890
"StreamConsumerOptions",
91+
"StreamFilterOptions",
92+
"MessageProperties",
8993
"OffsetSpecification",
9094
"OutcomeState",
9195
"Environment",

rabbitmq_amqp_python_client/entities.py

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass, field
2-
from datetime import timedelta
2+
from datetime import datetime, timedelta
33
from enum import Enum
44
from typing import Any, Dict, Optional, Union
55

@@ -10,6 +10,7 @@
1010
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
1111
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
1212
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
13+
AMQP_PROPERTIES_FILTER = "amqp:properties-filter"
1314

1415

1516
@dataclass
@@ -149,6 +150,42 @@ class ExchangeToExchangeBindingSpecification:
149150
binding_key: Optional[str] = None
150151

151152

153+
@dataclass
154+
class MessageProperties:
155+
"""
156+
Properties for an AMQP message.
157+
158+
Attributes:
159+
message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str).
160+
user_id: Identity of the user responsible for producing the message.
161+
to: Intended destination node of the message.
162+
subject: Summary information about the message content and purpose.
163+
reply_to: Address of the node to send replies to.
164+
correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str).
165+
content_type: RFC-2046 MIME type for the message's body.
166+
content_encoding: Modifier to the content-type.
167+
absolute_expiry_time: Absolute time when the message expires.
168+
creation_time: Absolute time when the message was created.
169+
group_id: Group the message belongs to.
170+
group_sequence: Relative position of this message within its group.
171+
reply_to_group_id: Id for sending replies to a specific group.
172+
"""
173+
174+
message_id: Optional[Union[int, str, bytes]] = None
175+
user_id: Optional[bytes] = None
176+
to: Optional[str] = None
177+
subject: Optional[str] = None
178+
reply_to: Optional[str] = None
179+
correlation_id: Optional[Union[int, str, bytes]] = None
180+
content_type: Optional[str] = None
181+
content_encoding: Optional[str] = None
182+
absolute_expiry_time: Optional[datetime] = None
183+
creation_time: Optional[datetime] = None
184+
group_id: Optional[str] = None
185+
group_sequence: Optional[int] = None
186+
reply_to_group_id: Optional[str] = None
187+
188+
152189
"""
153190
StreamFilterOptions defines the filtering options for a stream consumer.
154191
for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
@@ -159,18 +196,21 @@ class StreamFilterOptions:
159196
values: Optional[list[str]] = None
160197
match_unfiltered: bool = False
161198
application_properties: Optional[dict[str, Any]] = None
199+
message_properties: Optional[MessageProperties] = None
162200
sql: str = ""
163201

164202
def __init__(
165203
self,
166204
values: Optional[list[str]] = None,
167205
match_unfiltered: bool = False,
168206
application_properties: Optional[dict[str, Any]] = None,
207+
message_properties: Optional[MessageProperties] = None,
169208
sql: str = "",
170209
):
171210
self.values = values
172211
self.match_unfiltered = match_unfiltered
173212
self.application_properties = application_properties
213+
self.message_properties = message_properties
174214
self.sql = sql
175215

176216

@@ -195,27 +235,23 @@ def __init__(
195235
filter_options: Optional[StreamFilterOptions] = None,
196236
):
197237

198-
self.streamFilterOptions = filter_options
238+
self._filter_set: Dict[symbol, Described] = {}
199239

200-
if offset_specification is None and self.streamFilterOptions is None:
240+
if offset_specification is None and filter_options is None:
201241
raise ValidationCodeException(
202242
"At least one between offset_specification and filters must be set when setting up filtering"
203243
)
204-
self._filter_set: Dict[symbol, Described] = {}
205244
if offset_specification is not None:
206245
self._offset(offset_specification)
207246

208-
if (
209-
self.streamFilterOptions is not None
210-
and self.streamFilterOptions.values is not None
211-
):
212-
self._filter_values(self.streamFilterOptions.values)
247+
if filter_options is not None and filter_options.values is not None:
248+
self._filter_values(filter_options.values)
213249

214-
if (
215-
self.streamFilterOptions is not None
216-
and self.streamFilterOptions.match_unfiltered
217-
):
218-
self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered)
250+
if filter_options is not None and filter_options.match_unfiltered:
251+
self._filter_match_unfiltered(filter_options.match_unfiltered)
252+
253+
if filter_options is not None and filter_options.message_properties is not None:
254+
self._filter_message_properties(filter_options.message_properties)
219255

220256
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
221257
"""
@@ -257,6 +293,29 @@ def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None:
257293
symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered
258294
)
259295

296+
def _filter_message_properties(
297+
self, message_properties: Optional[MessageProperties]
298+
) -> None:
299+
"""
300+
Set AMQP message properties for filtering.
301+
302+
Args:
303+
message_properties: MessageProperties object containing AMQP message properties
304+
"""
305+
if message_properties is not None:
306+
# dictionary of symbols and described
307+
filter_prop: Dict[symbol, Any] = {}
308+
309+
for key, value in message_properties.__dict__.items():
310+
if value is not None:
311+
# replace _ with - for the key
312+
filter_prop[symbol(key.replace("_", "-"))] = value
313+
314+
if len(filter_prop) > 0:
315+
self._filter_set[symbol(AMQP_PROPERTIES_FILTER)] = Described(
316+
symbol(AMQP_PROPERTIES_FILTER), filter_prop
317+
)
318+
260319
def filter_set(self) -> Dict[symbol, Described]:
261320
"""
262321
Get the current filter set configuration.

0 commit comments

Comments
 (0)